redgreen_test.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 723 行 · 第 1/2 页
CPP
723 行
// -*- C++ -*- RedGreen_Test.cpp,v 1.13 2003/11/04 05:21:33 dhinton Exp
#include "RedGreen_Test.h"
#include "ace/Arg_Shifter.h"
#include "ace/Get_Opt.h"
#include "ace/OS_NS_unistd.h"
#include "orbsvcs/Time_Utilities.h"
#include "tao/debug.h"
ACE_RCSID(Notify, RedGreen_Test, "RedGreen_Test.cpp,v 1.13 2003/11/04 05:21:33 dhinton Exp")
#define NOTIFY_FACTORY_NAME "NotifyEventChannelFactory"
#define NAMING_SERVICE_NAME "NameService"
#define DOMAIN_GREEN "DOMAIN_GREEN"
#define DOMAIN_RED "DOMAIN_RED"
#define TYPE_GREEN "TYPE_GREEN"
#define TYPE_RED "TYPE_RED"
ACE_Atomic_Op <TAO_SYNCH_MUTEX, int> g_result_count = 0;
ACE_hrtime_t g_throughput_start_;
int
RedGreen_Test::parse_args (int argc,
char *argv[])
{
ACE_Arg_Shifter arg_shifter (argc, argv);
const char *current_arg = 0;
while (arg_shifter.is_anything_left ())
{
if ((current_arg = arg_shifter.get_the_parameter ("-burst_size")))
{
this->burst_size_ = ACE_OS::atoi (current_arg);
ACE_DEBUG ((LM_DEBUG,
"Burst size = %d\n",
burst_size_));
// The number of events to send/receive.
arg_shifter.consume_arg ();
}
else if (arg_shifter.cur_arg_strncasecmp ("-?") == 0)
{
ACE_DEBUG((LM_DEBUG,
"usage: %s "
"-burst_size [count]\n",
argv[0],
argv[0]));
arg_shifter.consume_arg ();
return -1;
}
else
{
arg_shifter.ignore_arg ();
}
}
return 0;
}
RedGreen_Test::RedGreen_Test (void)
: burst_size_ (10),
nthreads_ (2)
{
// No-Op.
ifgop_ = CosNotifyChannelAdmin::OR_OP;
}
RedGreen_Test::~RedGreen_Test ()
{
if (!CORBA::is_nil (ec_.in ()))
{
this->ec_->destroy ();
}
}
void
RedGreen_Test::init (int argc,
char *argv []
ACE_ENV_ARG_DECL)
{
this->init_ORB (argc,
argv
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->resolve_naming_service (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->resolve_Notify_factory (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->create_EC (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->create_supplieradmin (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->create_consumeradmin (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->create_consumers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
this->create_suppliers (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
void
RedGreen_Test::run (ACE_ENV_SINGLE_ARG_DECL)
{
this->send_events (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
worker_.thr_mgr ()->wait ();
}
void
RedGreen_Test::done (void)
{
dump_results ();
worker_.done ();
}
void
RedGreen_Test::init_ORB (int argc,
char *argv []
ACE_ENV_ARG_DECL)
{
this->orb_ = CORBA::ORB_init (argc,
argv,
""
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
CORBA::Object_ptr poa_object =
this->orb_->resolve_initial_references("RootPOA"
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
if (CORBA::is_nil (poa_object))
{
ACE_ERROR ((LM_ERROR,
" (%P|%t) Unable to initialize the POA.\n"));
return;
}
this->root_poa_ =
PortableServer::POA::_narrow (poa_object
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
PortableServer::POAManager_var poa_manager =
root_poa_->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
worker_.orb (this->orb_.in ());
if (worker_.activate (THR_NEW_LWP | THR_JOINABLE,
this->nthreads_) != 0)
{
ACE_ERROR ((LM_ERROR,
"Cannot activate client threads\n"));
}
}
void
RedGreen_Test::resolve_naming_service (ACE_ENV_SINGLE_ARG_DECL)
{
CORBA::Object_var naming_obj =
this->orb_->resolve_initial_references (NAMING_SERVICE_NAME
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// Need to check return value for errors.
if (CORBA::is_nil (naming_obj.in ()))
{
ACE_THROW (CORBA::UNKNOWN ());
}
this->naming_context_ =
CosNaming::NamingContext::_narrow (naming_obj.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
RedGreen_Test::resolve_Notify_factory (ACE_ENV_SINGLE_ARG_DECL)
{
CosNaming::Name name (1);
name.length (1);
name[0].id = CORBA::string_dup (NOTIFY_FACTORY_NAME);
CORBA::Object_var obj =
this->naming_context_->resolve (name
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->notify_factory_ =
CosNotifyChannelAdmin::EventChannelFactory::_narrow (
obj.in ()
ACE_ENV_ARG_PARAMETER
);
ACE_CHECK;
}
void
RedGreen_Test::create_EC (ACE_ENV_SINGLE_ARG_DECL)
{
CosNotifyChannelAdmin::ChannelID id;
this->ec_ = notify_factory_->create_channel (this->initial_qos_,
this->initial_admin_,
id
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_ASSERT (!CORBA::is_nil (ec_.in ()));
}
void
RedGreen_Test::create_supplieradmin (ACE_ENV_SINGLE_ARG_DECL)
{
CosNotifyChannelAdmin::AdminID adminid;
supplier_admin_ =
ec_->new_for_suppliers (this->ifgop_,
adminid
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));
}
void
RedGreen_Test:: create_consumeradmin (ACE_ENV_SINGLE_ARG_DECL)
{
CosNotifyChannelAdmin::AdminID adminid;
consumer_admin_ =
ec_->new_for_consumers (this->ifgop_,
adminid
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ()));
}
void
RedGreen_Test::create_consumers (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_NEW (this->normal_consumer_,
RedGreen_Test_StructuredPushConsumer (this));
this->normal_consumer_->connect (this->consumer_admin_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_NEW (this->slow_consumer_,
SlowConsumer (this));
this->slow_consumer_->connect (this->consumer_admin_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
RedGreen_Test::create_suppliers (ACE_ENV_SINGLE_ARG_DECL)
{
ACE_NEW (this->supplier_,
RedGreen_Test_StructuredPushSupplier ());
this->supplier_->connect (this->supplier_admin_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
void
RedGreen_Test::send_events (ACE_ENV_SINGLE_ARG_DECL)
{
// Setup the Consumer 1 to receive
//event_type : "DOMAIN_GREEN", "DOMAIN_GREEN".
CosNotification::EventTypeSeq added_1(1);
CosNotification::EventTypeSeq removed_1 (0);
added_1[0].domain_name = CORBA::string_dup (DOMAIN_GREEN);
added_1[0].type_name = CORBA::string_dup (TYPE_GREEN);
added_1.length (1);
removed_1.length (0);
this->normal_consumer_->get_proxy_supplier ()->subscription_change (
added_1,
removed_1
ACE_ENV_ARG_PARAMETER
);
ACE_CHECK;
// Setup the Consumer 2 to receive event_type : "DOMAIN_RED", "TYPE_RED"
CosNotification::EventTypeSeq added_2(1);
CosNotification::EventTypeSeq removed_2 (0);
added_2[0].domain_name = CORBA::string_dup (DOMAIN_RED);
added_2[0].type_name = CORBA::string_dup (TYPE_RED);
added_2.length (1);
removed_2.length (0);
this->slow_consumer_->get_proxy_supplier ()->subscription_change (
added_2,
removed_2
ACE_ENV_ARG_PARAMETER
);
ACE_CHECK;
// Create the events - one of each type
// Event 2
CosNotification::StructuredEvent green_event;
green_event.header.fixed_header.event_type.domain_name =
CORBA::string_dup(DOMAIN_GREEN);
green_event.header.fixed_header.event_type.type_name =
CORBA::string_dup(TYPE_GREEN);
green_event.header.fixed_header.event_name = CORBA::string_dup ("");
green_event.header.variable_header.length (0); // put nothing here
green_event.filterable_data.length (0);
green_event.remainder_of_body <<= (CORBA::Long)10;
// event 3
CosNotification::StructuredEvent red_event;
red_event.header.fixed_header.event_type.domain_name =
CORBA::string_dup(DOMAIN_RED);
red_event.header.fixed_header.event_type.type_name =
CORBA::string_dup(TYPE_RED);
red_event.header.fixed_header.event_name = CORBA::string_dup("");
red_event.header.variable_header.length (0); // put nothing here
red_event.filterable_data.length (0);
red_event.remainder_of_body <<= (CORBA::Long)10;
g_throughput_start_ = ACE_OS::gethrtime ();
// let supplier 1 send all these events
for (int i = 0; i < this->burst_size_; ++i)
{
this->supplier_->send_event (red_event
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
this->supplier_->send_event (green_event
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
RedGreen_Test::dump_results (void)
{
ACE_Throughput_Stats throughput;
ACE_UINT32 gsf = ACE_High_Res_Timer::global_scale_factor ();
char buf[BUFSIZ];
ACE_OS::sprintf (buf,
"Normal Consumer [%02d]",
1);
normal_consumer_->dump_stats (buf,
gsf);
normal_consumer_->accumulate_into (throughput);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?