periodic_consumer.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 269 行
CPP
269 行
// Periodic_Consumer.cpp,v 1.5 2003/08/24 13:50:14 jwillemsen Exp
#include "Periodic_Consumer.h"
#include "ace/Arg_Shifter.h"
#include "ace/High_Res_Timer.h"
#include "tao/debug.h"
#include "orbsvcs/Time_Utilities.h"
#include "StructuredEvent.h"
#include "Task_Stats.h"
#include "Task_Callback.h"
#include "LookupManager.h"
#include "Priority_Mapping.h"
ACE_RCSID(RT_Notify, TAO_Notify_Tests_Periodic_Consumer, "Periodic_Consumer.cpp,v 1.5 2003/08/24 13:50:14 jwillemsen Exp")
int WARMUP_COUNT = 10;
TAO_Notify_Tests_Periodic_Consumer::TAO_Notify_Tests_Periodic_Consumer (void)
: count_ (-2)
, warmup_countdown_ (WARMUP_COUNT)
, max_count_ (-1)
, load_ (0)
, client_ (0)
, check_priority_ (0)
, stop_received_ (0)
{
}
TAO_Notify_Tests_Periodic_Consumer::~TAO_Notify_Tests_Periodic_Consumer ()
{
}
void
TAO_Notify_Tests_Periodic_Consumer::task_callback (TAO_Notify_Tests_Task_Callback* client)
{
this->client_ = client;
}
int
TAO_Notify_Tests_Periodic_Consumer::init_state (ACE_Arg_Shifter& arg_shifter)
{
// First, let the base class look for options.
if (TAO_Notify_Tests_StructuredPushConsumer::init_state (arg_shifter) == -1)
return -1;
const ACE_TCHAR *current_arg = 0;
while (arg_shifter.is_anything_left ())
{
if ((current_arg = arg_shifter.get_the_parameter ("-MaxCount")))
{
this->max_count_ = ACE_OS::atoi (current_arg);
arg_shifter.consume_arg ();
if (max_count_ == 0)
{
if (this->client_)
this->client_->done (this);
}
}
else if (arg_shifter.cur_arg_strncasecmp ("-Check_Priority") == 0)
{
this->check_priority_ = 1;
arg_shifter.consume_arg ();
}
else
{
break;
}
} /* while */
return 0;
}
void
TAO_Notify_Tests_Periodic_Consumer::handle_start_event (const CosNotification::PropertySeq& prop_seq)
{
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer %s received inital (-1)th event \n", this->name_.c_str ()));
for (CORBA::ULong i = 0; i < prop_seq.length (); ++i)
{
if (ACE_OS::strcmp (prop_seq[i].name.in (), "BaseTime") == 0)
{
TimeBase::TimeT base_time;
ACE_hrtime_t base_time_hrtime;
prop_seq[i].value >>= base_time;
ORBSVCS_Time::TimeT_to_hrtime (base_time_hrtime, base_time);
stats_.base_time (base_time_hrtime);
}
// if max_count has not been already specified, get it from the supplier.
else if (this->max_count_ == -1 &&
ACE_OS::strcmp (prop_seq[i].name.in (), "MaxCount") == 0)
{
prop_seq[i].value >>= this->max_count_;
}
else if (ACE_OS::strcmp (prop_seq[i].name.in (), "Load") == 0)
{
prop_seq[i].value >>= this->load_;
}
}
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG, "(%P, %t) Maxcount = %d, Load = %d\n",
this->max_count_, this->load_));
}
}
void
TAO_Notify_Tests_Periodic_Consumer::check_priority (const CosNotification::PropertySeq& prop_seq)
{
// Check if the event carries a Priority.
int event_has_priority_set = 0;
CORBA::Short event_priority = 0;
for (CORBA::ULong i = 0; i < prop_seq.length (); ++i)
{
if (ACE_OS::strcmp (prop_seq[i].name.in (), CosNotification::Priority) == 0)
{
prop_seq[i].value >>= event_priority;
event_has_priority_set = 1;
break;
}
}
if (event_has_priority_set == 1)
{
// Confirm that the current thread is at the priority set in the event
ACE_hthread_t current;
ACE_Thread::self (current);
int priority;
if (ACE_Thread::getprio (current, priority) == -1)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO (%P|%t) - ")
ACE_TEXT (" ACE_Thread::get_prio\n")));
return ;
}
CORBA::Short native_priority = CORBA::Short (priority);
TAO_Notify_Tests_Priority_Mapping* priority_mapping;
LOOKUP_MANAGER->resolve (priority_mapping);
CORBA::Short corba_priority;
priority_mapping->to_CORBA (native_priority, corba_priority);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG,
"Periodic Consumer expected priority = %d, received priority = %d/%d (native/corba)\n",
event_priority, native_priority, corba_priority));
if (corba_priority != event_priority)
ACE_DEBUG ((LM_DEBUG,
"Error: Periodic Consumer expected priority = %d, received priority = %d\n",
event_priority, corba_priority));
}
}
void
TAO_Notify_Tests_Periodic_Consumer::push_structured_event (const CosNotification::StructuredEvent & notification ACE_ENV_ARG_DECL)
ACE_THROW_SPEC ((
CORBA::SystemException,
CosEventComm::Disconnected
))
{
ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
CORBA::INTERNAL ());
ACE_CHECK;
const CosNotification::PropertySeq& prop_seq =
notification.header.variable_header;
if (this->count_ == -2)
{
if (--warmup_countdown_ == 0)
this->count_ = -1;
return;
}
else if (this->count_ == -1)
{
this->handle_start_event (prop_seq);
if (this->max_count_ > 0)
this->stats_.init (this->max_count_);
this->count_ = 0;
return;
}
if (this->check_priority_)
{
this->check_priority (prop_seq);
}
if (TAO_debug_level > 0)
{
ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer %s received %d event type (%s,%s) \n", this->name_.c_str (), this->count_,
notification.header.fixed_header.event_type.domain_name.in(),
notification.header.fixed_header.event_type.type_name.in()));
}
for (CORBA::ULong i = 0; i < prop_seq.length (); ++i)
{
if (ACE_OS::strcmp (prop_seq[i].name.in (), "Stop") == 0)
{
this->stop_received_ = 1;
}
}
TimeBase::TimeT send_time, now;
ACE_hrtime_t send_time_hrtime;
notification.remainder_of_body >>= send_time;
ORBSVCS_Time::TimeT_to_hrtime (send_time_hrtime, send_time);
now = ACE_OS::gethrtime ();
stats_.sample (send_time_hrtime, now);
// Eat CPU
static CORBA::ULong prime_number = 9619;
for (CORBA::ULong load = this->load_; load != 0; --load)
ACE::is_prime (prime_number,
2,
prime_number / 2);
// ---
if (++this->count_ >= this->max_count_ || this->stop_received_ == 1)
{
stats_.end_time (ACE_OS::gethrtime ());
if (this->client_)
this->client_->done (this);
if (TAO_debug_level > 0)
ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer %s done \n", this->name_.c_str ()));
}
}
void
TAO_Notify_Tests_Periodic_Consumer::dump_stats (ACE_TCHAR* msg, int dump_samples)
{
char buf[BUFSIZ];
ACE_OS::sprintf (buf, "%s.dat", this->name_.c_str ());
ACE_CString fname (buf);
ACE_OS::sprintf (buf,
"%s# Consumer Name = %s, Proxy ID = %d Load = %u\n",
msg,
this->name_.c_str (), this->proxy_id_, this->load_);
stats_.dump_samples (fname.c_str (), buf, dump_samples);
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?