📄 throughput.cpp
字号:
// Throughput.cpp,v 1.20 2003/07/06 21:41:58 pradeep Exp
#include "Throughput.h"
#include "ace/Arg_Shifter.h"
#include "ace/Get_Opt.h"
#include "ace/Synch.h"
#include "ace/OS.h"
#include "ace/Dynamic_Service.h"
#include "tao/Strategies/advanced_resource.h"
#include "tao/Messaging/Messaging.h"
#include "orbsvcs/Notify/Service.h"
#include "orbsvcs/Time_Utilities.h"
ACE_RCSID (Notify_Tests, Throughput, "Throughput.cpp,v 1.20 2003/07/06 21:41:58 pradeep Exp")
/***************************************************************************/
Throughput_StructuredPushConsumer::Throughput_StructuredPushConsumer (
Notify_Throughput *test_client
)
: test_client_ (test_client),
push_count_ (0)
{
}
void
Throughput_StructuredPushConsumer::accumulate_into (
ACE_Throughput_Stats &throughput
) const
{
throughput.accumulate (this->throughput_);
}
void
Throughput_StructuredPushConsumer::dump_stats (const char* msg,
ACE_UINT32 gsf)
{
this->throughput_.dump_results (msg, gsf);
}
void
Throughput_StructuredPushConsumer::push_structured_event (
const CosNotification::StructuredEvent & notification
ACE_ENV_ARG_DECL_NOT_USED
)
ACE_THROW_SPEC ((CORBA::SystemException,
CosEventComm::Disconnected))
{
// Extract payload.
const char* msg;
CORBA::Boolean ok = (notification.remainder_of_body >>= msg);
if (!ok)
ACE_DEBUG ((LM_DEBUG, "(%t) Error extracting message body\n"));
TimeBase::TimeT Throughput_base_recorded;
ACE_hrtime_t Throughput_base;
notification.filterable_data[0].value >>= Throughput_base_recorded;
ORBSVCS_Time::TimeT_to_hrtime (Throughput_base,
Throughput_base_recorded);
ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
if (this->push_count_ == 0)
{
this->throughput_start_ = ACE_OS::gethrtime ();
}
++this->push_count_;
// Grab timestamp again.
ACE_hrtime_t now = ACE_OS::gethrtime ();
// Record statistics.
this->throughput_.sample (now - this->throughput_start_,
now - Throughput_base);
if (this->push_count_%1000 == 0)
{
ACE_DEBUG ((LM_DEBUG,
"(%P)(%t) event count = %d\n",
this->push_count_));
}
if (push_count_ == test_client_->perconsumer_count_)
{
ACE_DEBUG ((LM_DEBUG,
"(%t)expected count reached\n"));
test_client_->peer_done ();
}
}
/***************************************************************************/
Throughput_StructuredPushSupplier::Throughput_StructuredPushSupplier (
Notify_Throughput* test_client
)
:test_client_ (test_client),
push_count_ (0)
{
}
Throughput_StructuredPushSupplier::~Throughput_StructuredPushSupplier ()
{
}
void
Throughput_StructuredPushSupplier::accumulate_into (
ACE_Throughput_Stats &throughput
) const
{
throughput.accumulate (this->throughput_);
}
void
Throughput_StructuredPushSupplier::dump_stats (const char* msg,
ACE_UINT32 gsf)
{
this->throughput_.dump_results (msg, gsf);
}
int
Throughput_StructuredPushSupplier::svc (void)
{
// Initialize a time value to pace the test.
ACE_Time_Value tv (0, test_client_->burst_pause_);
// Operations.
CosNotification::StructuredEvent event;
// EventHeader
// FixedEventHeader
// EventType
// string
event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*");
// string
event.header.fixed_header.event_type.type_name = CORBA::string_dup("*");
// string
event.header.fixed_header.event_name = CORBA::string_dup("myevent");
// OptionalHeaderFields
// PropertySeq
// sequence<Property>: string name, any value
CosNotification::PropertySeq& qos = event.header.variable_header;
qos.length (0); // put nothing here
// FilterableEventBody
// PropertySeq
// sequence<Property>: string name, any value
event.filterable_data.length (1);
event.filterable_data[0].name = CORBA::string_dup("Throughput_base");
event.remainder_of_body <<= test_client_->payload_;
ACE_DECLARE_NEW_CORBA_ENV;
this->throughput_start_ = ACE_OS::gethrtime ();
for (int i = 0; i < test_client_->burst_count_; ++i)
{
for (int j = 0; j < test_client_->burst_size_; ++j)
{
// Record current time.
ACE_hrtime_t start = ACE_OS::gethrtime ();
TimeBase::TimeT Throughput_base;
ORBSVCS_Time::hrtime_to_TimeT (Throughput_base,
start);
// Any.
event.filterable_data[0].value <<= Throughput_base;
this->proxy_->push_structured_event (event
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_hrtime_t end = ACE_OS::gethrtime ();
this->throughput_.sample (end - this->throughput_start_,
end - start);
}
ACE_OS::sleep (tv);
}
ACE_DEBUG ((LM_DEBUG, "(%P) (%t) Supplier done\n"));
test_client_->peer_done ();
return 0;
}
/***************************************************************************/
Notify_Throughput::Notify_Throughput (void)
: collocated_ec_ (0),
burst_count_ (1),
burst_pause_ (10000),
burst_size_ (1000),
payload_size_ (1024),
payload_ (0),
consumer_count_ (1),
supplier_count_ (1),
perconsumer_count_ (burst_size_*burst_count_*supplier_count_),
consumers_ (0),
suppliers_ (0),
nthreads_ (1),
peer_done_count_ (consumer_count_ + supplier_count_),
condition_ (lock_)
{
}
Notify_Throughput::~Notify_Throughput ()
{
ACE_DECLARE_NEW_CORBA_ENV;
this->orb_->shutdown (0
ACE_ENV_ARG_PARAMETER);
delete payload_;
}
int
Notify_Throughput::init (int argc, char* argv [] ACE_ENV_ARG_DECL)
{
// Initialize base class.
Notify_Test_Client::init_ORB (argc,
argv
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
#if (TAO_HAS_CORBA_MESSAGING == 1)
CORBA::Object_var manager_object =
orb_->resolve_initial_references ("ORBPolicyManager"
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::PolicyManager_var policy_manager =
CORBA::PolicyManager::_narrow (manager_object.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::Any sync_scope;
sync_scope <<= Messaging::SYNC_WITH_TARGET;
CORBA::PolicyList policy_list (1);
policy_list.length (1);
policy_list[0] =
orb_->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
sync_scope
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policy_manager->set_policy_overrides (policy_list,
CORBA::SET_OVERRIDE
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
#else
ACE_DEBUG ((LM_DEBUG,
"CORBA Messaging disabled in this configuration,"
" test may not be optimally configured\n"));
#endif /* TAO_HAS_MESSAGING */
worker_.orb (this->orb_.in ());
if (worker_.activate (THR_NEW_LWP | THR_JOINABLE,
this->nthreads_) != 0)
{
ACE_ERROR ((LM_ERROR,
"Cannot activate client threads\n"));
}
// Create all participents ...
this->create_EC (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CosNotifyChannelAdmin::AdminID adminid;
supplier_admin_ =
ec_->new_for_suppliers (this->ifgop_, adminid ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));
consumer_admin_ =
ec_->new_for_consumers (this->ifgop_, adminid ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ()));
ACE_NEW_RETURN (consumers_,
Throughput_StructuredPushConsumer*[this->consumer_count_],
-1);
ACE_NEW_RETURN (suppliers_,
Throughput_StructuredPushSupplier*[this->supplier_count_],
-1);
// ----
int i = 0;
for (i = 0; i < this->consumer_count_; ++i)
{
ACE_NEW_RETURN (consumers_[i],
Throughput_StructuredPushConsumer (this),
-1);
consumers_[i]->init (root_poa_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
consumers_[i]->connect (this->consumer_admin_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
}
for (i = 0; i < this->supplier_count_; ++i)
{
ACE_NEW_RETURN (suppliers_[i],
Throughput_StructuredPushSupplier (this),
-1);
suppliers_[i]->TAO_Notify_Tests_StructuredPushSupplier::init (
root_poa_.in ()
ACE_ENV_ARG_PARAMETER
);
ACE_CHECK_RETURN (-1);
suppliers_[i]->connect (this->supplier_admin_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
}
return 0;
}
int
Notify_Throughput::parse_args(int argc, char *argv[])
{
ACE_Arg_Shifter arg_shifter (argc, argv);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -