sequence.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 399 行
CPP
399 行
//Sequence.cpp,v 1.2 2003/07/16 20:29:51 pradeep Exp
#include "ace/Arg_Shifter.h"
#include "ace/Get_Opt.h"
#include "tao/debug.h"
#include "Sequence.h"
ACE_RCSID (Notify_Tests, Sequence, "Sequence.cpp,v 1.2 2003/07/16 20:29:51 pradeep Exp")
/***************************************************************************/
SequencePushConsumer::SequencePushConsumer (Sequence *test_client)
: test_client_ (test_client)
{
}
void
SequencePushConsumer::push_structured_events (
const CosNotification::EventBatch &batch
ACE_ENV_ARG_DECL_NOT_USED
)
ACE_THROW_SPEC ((CORBA::SystemException,
CosEventComm::Disconnected))
{
this->test_client_->events_received_ += batch.length ();
if (batch.length () > this->test_client_->consumer_batch_size_)
ACE_DEBUG ((LM_ERROR,
"Error: Received more than max event batch %d\n",
batch.length ()));
this->test_client_->on_event_received ();
ACE_OS::sleep (this->test_client_->consumer_delay_);
}
/***************************************************************************/
SequencePushSupplier::SequencePushSupplier (
Sequence* test_client
)
: test_client_ (test_client)
{
}
SequencePushSupplier::~SequencePushSupplier (void)
{
}
/***************************************************************************/
Sequence::Sequence (void)
: event_count_ (15), supplier_batch_size_ (5), consumer_batch_size_ (3),
pacing_ (2), order_policy_ (CosNotification::PriorityOrder), events_received_ (0),
consumer_delay_ (1)
{
}
Sequence::~Sequence (void)
{
}
int
Sequence::init (int argc,
char* argv []
ACE_ENV_ARG_DECL)
{
if (TAO_debug_level)
{
ACE_DEBUG ((LM_DEBUG,
"Options: event count = %d \n"
"supplier batch size = %d \n"
"consumer batch size = %d \n"
"pacing = %d \n"
, event_count_
, supplier_batch_size_
, consumer_batch_size_
, pacing_));
ACE_DEBUG ((LM_DEBUG, "consumer delay = %d\n", consumer_delay_.sec ()));
}
// Initialize the base class.
Notify_Test_Client::init (argc,
argv
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
// Create all participents.
this->create_EC (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CosNotifyChannelAdmin::AdminID adminid;
this->supplier_admin_ =
this->ec_->new_for_suppliers (this->ifgop_,
adminid
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));
this->consumer_admin_ =
this->ec_->new_for_consumers (this->ifgop_,
adminid
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ()));
ACE_NEW_RETURN (this->consumer_,
SequencePushConsumer (this),
-1);
this->consumer_->init (root_poa_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
this->consumer_->connect (this->consumer_admin_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CosNotification::QoSProperties properties (3);
properties.length (3);
properties[0].name = CORBA::string_dup (CosNotification::MaximumBatchSize);
properties[0].value <<= (CORBA::Long) this->consumer_batch_size_;
properties[1].name = CORBA::string_dup (CosNotification::PacingInterval);
properties[1].value <<= (TimeBase::TimeT) this->pacing_;
properties[2].name = CORBA::string_dup (CosNotification::OrderPolicy);
properties[2].value <<= this->order_policy_;
this->consumer_->get_proxy_supplier ()->set_qos (properties);
ACE_NEW_RETURN (this->supplier_,
SequencePushSupplier (this),
-1);
this->supplier_->init (root_poa_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
this->supplier_->connect (this->supplier_admin_.in ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
return 0;
}
int
Sequence::parse_args (int argc,
char *argv[])
{
ACE_Arg_Shifter arg_shifter (argc,
argv);
const char *current_arg = 0;
while (arg_shifter.is_anything_left ())
{
if ((current_arg = arg_shifter.get_the_parameter ("-events")))
{
this->event_count_ = ACE_OS::atoi (current_arg); // The number of events to send/receive.
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-SupplierBatchSize")))
{
this->supplier_batch_size_ = ACE_OS::atoi (current_arg); // Supplier batch size
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-ConsumerBatchSize")))
{
this->consumer_batch_size_ = ACE_OS::atoi (current_arg); // Consumer batch size
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-ConsumerDelay")))
{
this->consumer_delay_ = ACE_Time_Value (ACE_OS::atoi (current_arg), 0); // Consumer delay in secs.
arg_shifter.consume_arg ();
}
else if ((current_arg = arg_shifter.get_the_parameter ("-Pacing")))
{
this->pacing_ = (TimeBase::TimeT) ACE_OS::atoi (current_arg); // pacing
arg_shifter.consume_arg ();
}
else if (arg_shifter.cur_arg_strncasecmp ("-?") == 0)
{
ACE_DEBUG((LM_DEBUG,
"usage: %s "
"-events event_count "
"-SupplierBatchSize size "
"-ConsumerBatchSize size "
"-ConsumerDelay delay "
"-Pacing pacing \n",
argv[0], argv[0]));
arg_shifter.consume_arg ();
return -1;
}
else
{
arg_shifter.ignore_arg ();
}
}
return 0;
}
void
Sequence::create_EC (ACE_ENV_SINGLE_ARG_DECL)
{
CosNotifyChannelAdmin::ChannelID id;
this->ec_ = notify_factory_->create_channel (this->initial_qos_,
this->initial_admin_,
id
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
ACE_ASSERT (!CORBA::is_nil (this->ec_.in ()));
}
void
Sequence::on_event_received (void)
{
if (TAO_debug_level)
ACE_DEBUG ((LM_DEBUG,
"Events received = %d\n",
this->events_received_.value ()));
if (this->events_received_.value () == this->event_count_)
{
ACE_DECLARE_NEW_CORBA_ENV;
this->end_test (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
}
void
Sequence::run_test (ACE_ENV_SINGLE_ARG_DECL)
{
// operations:
CosNotification::StructuredEvent event;
// EventHeader.
// FixedEventHeader.
// EventType.
// string.
event.header.fixed_header.event_type.domain_name = CORBA::string_dup("*");
// string
event.header.fixed_header.event_type.type_name = CORBA::string_dup("*");
// string
event.header.fixed_header.event_name = CORBA::string_dup("myevent");
// OptionalHeaderFields.
// PropertySeq.
// sequence<Property>: string name, any value
CosNotification::PropertySeq& qos = event.header.variable_header;
qos.length (1); // put nothing here
// FilterableEventBody
// PropertySeq
// sequence<Property>: string name, any value
event.filterable_data.length (3);
event.filterable_data[0].name = CORBA::string_dup("threshold");
event.filterable_data[1].name = CORBA::string_dup("temperature");
event.filterable_data[1].value <<= (CORBA::Long)70;
event.filterable_data[2].name = CORBA::string_dup("pressure");
event.filterable_data[2].value <<= (CORBA::Long)80;
CORBA::Short prio = CosNotification::LowestPriority;
CosNotification::EventBatch batch;
batch.length (this->supplier_batch_size_);
CORBA::ULong batch_index = 0;
for (int i = 0; i < this->event_count_; ++i)
{
event.filterable_data[0].value <<= (CORBA::Long)i;
// any
event.remainder_of_body <<= (CORBA::Long)i;
qos[0].name = CORBA::string_dup (CosNotification::Priority);
qos[0].value <<= (CORBA::Short)prio++;
batch[batch_index] = event;
batch_index++;
if (batch_index == this->supplier_batch_size_)
{
batch.length (batch_index); // set the correct length
if (TAO_debug_level)
ACE_DEBUG ((LM_DEBUG, "Sending batch with %d events\n", batch.length ()));
this->supplier_->send_events (batch
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
// reset
batch.length (this->supplier_batch_size_);
batch_index = 0;
}
} // for
// send the last batch.
if (batch_index > 0)
{
batch.length (batch_index); // set the correct length
this->supplier_->send_events (batch
ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
}
void
Sequence::end_test (ACE_ENV_SINGLE_ARG_DECL)
{
this->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
}
int
Sequence::check_results (void)
{
// Destroy the channel.
ACE_DECLARE_NEW_CORBA_ENV;
this->ec_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (this->events_received_.value () == this->event_count_)
{
ACE_DEBUG ((LM_DEBUG,
"Sequence test success\n"));
return 0;
}
else
{
ACE_DEBUG ((LM_DEBUG,
"Sequence test failed!\n"));
return 1;
}
}
/***************************************************************************/
int
main (int argc, char* argv[])
{
Sequence events;
if (events.parse_args (argc, argv) == -1)
{
return 1;
}
ACE_TRY_NEW_ENV
{
events.init (argc,
argv
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
events.run_test (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
events.ORB_run ();
}
ACE_CATCH (CORBA::UserException, ue)
{
ACE_PRINT_EXCEPTION (ue,
"Sequence user error: ");
return 1;
}
ACE_CATCH (CORBA::SystemException, se)
{
ACE_PRINT_EXCEPTION (se,
"Sequence system error: ");
return 1;
}
ACE_ENDTRY;
return events.check_results ();
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#endif /*ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?