periodic_supplier.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 352 行
CPP
352 行
// Periodic_Supplier.cpp,v 1.6 2003/08/24 13:50:14 jwillemsen Exp
#include "Periodic_Supplier.h"
#include "ace/Arg_Shifter.h"
#include "ace/High_Res_Timer.h"
#include "ace/Barrier.h"
#include "tao/debug.h"
#include "tao/ORB_Core.h"
#include "orbsvcs/Time_Utilities.h"
#include "StructuredEvent.h"
#include "Task_Stats.h"
#include "Task_Callback.h"
#include "LookupManager.h"
#include "Priority_Mapping.h"
ACE_RCSID(RT_Notify, TAO_Notify_Tests_Periodic_Supplier, "Periodic_Supplier.cpp,v 1.6 2003/08/24 13:50:14 jwillemsen Exp")
TAO_Notify_Tests_Periodic_Supplier::TAO_Notify_Tests_Periodic_Supplier (void)
: barrier_ (0),
priority_ (0),
period_ (0),
total_deadlines_missed_ (0),
run_time_ (0),
exec_time_ (0),
phase_ (0),
iter_ (0),
load_ (0),
client_ (0)
{
}
TAO_Notify_Tests_Periodic_Supplier::~TAO_Notify_Tests_Periodic_Supplier ()
{
}
void
TAO_Notify_Tests_Periodic_Supplier::task_callback(TAO_Notify_Tests_Task_Callback* client)
{
this->client_ = client;
}
int
TAO_Notify_Tests_Periodic_Supplier::init_state (ACE_Arg_Shifter& arg_shifter)
{
// First, let the base class look for options.
if (TAO_Notify_Tests_StructuredPushSupplier::init_state (arg_shifter) == -1)
return -1;
const ACE_TCHAR *current_arg = 0;
while (arg_shifter.is_anything_left ())
{
if ((current_arg = arg_shifter.get_the_parameter ("-EventType")))
{
this->event_.type ("*", current_arg) ;
zeroth_event.type ("*", current_arg) ;
arg_shifter.consume_arg ();
}
else if (arg_shifter.cur_arg_strncasecmp ("-FilterLongData") == 0) // -FilterLongData name value
{
arg_shifter.consume_arg ();
ACE_CString name = arg_shifter.get_current ();
arg_shifter.consume_arg ();
CORBA::Long value = (CORBA::Long)ACE_OS::atoi (arg_shifter.get_current ());
arg_shifter.consume_arg ();
CORBA::Any buffer;
buffer <<= (CORBA::Long) value;
this->event_.filter (name.c_str (), buffer);
}
else if ((current_arg = arg_shifter.get_the_parameter ("-Priority")))
{
priority_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
CORBA::Any buffer;
buffer <<= (CORBA::Short) this->priority_;
this->event_.qos (CosNotification::Priority, buffer);
}
else if ((current_arg = arg_shifter.get_the_parameter ("-Period")))
{
period_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-ExecTime")))
{
exec_time_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-Phase")))
{
phase_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-Iter")))
{
iter_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
if (stats_.init (iter_) == -1)
return -1;
}
else if ((current_arg = arg_shifter.get_the_parameter ("-Load")))
{
load_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-RunTime"))) // in seconds
{
run_time_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
}
else
{
ACE_DEBUG ((LM_DEBUG, "parse Task unknown option %s\n",
arg_shifter.get_current ()));
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "event type %s, priority %d, period %duS, exec_time %duS, phase %duS, iter %d, load %d\n",
event_.type(), priority_, period_, exec_time_, phase_, iter_, load_));
break;
}
}
return 0;
}
int
TAO_Notify_Tests_Periodic_Supplier::activate_task (ACE_Barrier* barrier)
{
barrier_ = barrier;
long flags = THR_NEW_LWP | THR_JOINABLE;
// Resolve the ORB
CORBA::ORB_var orb;
LOOKUP_MANAGER->resolve (orb);
flags |=
orb->orb_core ()->orb_params ()->thread_creation_flags ();
TAO_Notify_Tests_Priority_Mapping* priority_mapping;
LOOKUP_MANAGER->resolve (priority_mapping);
CORBA::Short native_prio;
priority_mapping->to_native (this->priority_, native_prio);
// Become an active object.
if (this->ACE_Task <ACE_SYNCH>::activate (flags,
1,
0,
native_prio) == -1)
{
if (ACE_OS::last_error () == EPERM)
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
-1);
else
ACE_DEBUG ((LM_ERROR,
ACE_TEXT ("(%t) Task activation at priority %d failed, ")
ACE_TEXT ("exiting!\n%a"),
this->priority_,
-1));
}
ACE_DEBUG ((LM_ERROR, "Activated Periodic Supplier Thread at priority %d\n", this->priority_));
return 0;
}
void
TAO_Notify_Tests_Periodic_Supplier::send_warmup_events (ACE_ENV_SINGLE_ARG_DECL)
{
int WARMUP_COUNT = 10;
for (int i = 0; i < WARMUP_COUNT ; ++i)
{
this->send_event (this->event_.event () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
TAO_Notify_Tests_Periodic_Supplier::send_prologue (ACE_ENV_SINGLE_ARG_DECL)
{
// populate event.
// send the base time and max count.
TimeBase::TimeT base_time;
ORBSVCS_Time::hrtime_to_TimeT (base_time,
BASE_TIME::instance ()->base_time_);
CORBA::Any buffer;
buffer <<= base_time;
zeroth_event.opt_header ("BaseTime", buffer);
buffer <<= this->iter_;
zeroth_event.opt_header ("MaxCount", buffer);
buffer <<= this->load_;
zeroth_event.opt_header ("Load", buffer);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "(%P, %t) Supplier (%s) sending event 0th event\n"));
this->send_event (zeroth_event.event () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
TAO_Notify_Tests_Periodic_Supplier::handle_svc (ACE_ENV_SINGLE_ARG_DECL)
{
this->send_prologue (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
ACE_hrtime_t before, after;
TimeBase::TimeT time_t;
CORBA::Any buffer;
ACE_hrtime_t base_time = BASE_TIME::instance ()->base_time_;
for (int i = 0; i < iter_ ; ++i)
{
before = ACE_OS::gethrtime ();
ORBSVCS_Time::hrtime_to_TimeT (time_t,
before);
buffer <<= time_t;
this->event_.payload (buffer);
if (this->run_time_ != 0 &&
Task_Stats::diff_sec (base_time, before) > this->run_time_)
{
// Time up, send a "Stop" event.
buffer <<= (CORBA::Long) 1;
this->event_.opt_header ("Stop", buffer);
i = iter_; // Load the iter so that the loop exits.
}
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "(%P, %t) Supplier (%s) sending event #%d\n",
this->name_.c_str (), i));
this->send_event (this->event_.event () ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
after = ACE_OS::gethrtime ();
stats_.sample (before, after);
if (period_ != 0) // blast mode, no sleep.
{
ACE_UINT32 elapsed_microseconds =
Task_Stats::diff_usec (before, after);
// did we miss any deadlines?
int missed =
(int)elapsed_microseconds > period_ ? elapsed_microseconds/period_ : 0;
this->total_deadlines_missed_ += missed;
/* Start -- "Immediate run if last call missed deadline" */
if (missed > 0) // if we missed
continue;
long sleep_time = period_ - elapsed_microseconds;
/* End -- "Immediate run if last call missed deadline" */
/*
* This logic sleeps till the next period.
* So, if we missed a deadline we wait.
*
long sleep_time = (missed + 1)*period_ ;
sleep_time -= elapsed_microseconds;
*/
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "(%t) sleep time = %d uSec, missed %d deadlines\n", sleep_time, missed));
ACE_Time_Value t_sleep (0, sleep_time);
ACE_OS::sleep (t_sleep);
} /* period != 0 */
} /* for */
stats_.end_time (ACE_OS::gethrtime ());
if (this->client_)
this->client_->done (this);
}
int
TAO_Notify_Tests_Periodic_Supplier::svc (void)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "Thread_Task (%t) - wait\n"));
ACE_TRY_NEW_ENV
{
// First, send warmup events.
this->send_warmup_events (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
// Next, wait for other threads.
this->barrier_->wait ();
// first thread here inits the Base_Time.
stats_.base_time (BASE_TIME::instance ()->base_time_);
// now wait till the phase_ period expires.
ACE_OS::sleep (ACE_Time_Value (0, phase_));
this->handle_svc (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCH (CORBA::UserException, ue)
{
ACE_PRINT_EXCEPTION (ue,
"Periodic supplier: error sending event. ");
}
ACE_CATCH (CORBA::SystemException, se)
{
ACE_PRINT_EXCEPTION (se,
"Periodic supplier: error sending event. ");
}
ACE_ENDTRY;
return 0;
}
void
TAO_Notify_Tests_Periodic_Supplier::dump_stats (ACE_TCHAR* msg, int dump_samples)
{
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "%s.dat", this->name_.c_str ());
ACE_CString fname (buf);
ACE_OS::sprintf (buf,"%s# : Supplier Name = %s, Proxy ID = %d, Event Type = %s, priority %d, period %ld, exec_time %ld, phase %ld, iter_ %d , load_ %d, deadlines missed = %d\n",
msg, this->name_.c_str (), this->proxy_id_, this->event_.type (), priority_, period_, exec_time_, phase_, iter_, load_, this->total_deadlines_missed_);
stats_.dump_samples (fname.c_str (), buf, dump_samples);
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?