client.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 889 行 · 第 1/2 页
CPP
889 行
// client.cpp,v 1.9 2003/12/24 17:46:34 bala Exp
#include "Reply_Handler.h"
#include "Client_Task.h"
#include "tao/Messaging/Messaging.h"
#include "tao/TAOC.h"
#include "tao/TAOA.h"
#include "ace/Get_Opt.h"
ACE_RCSID(AMI_Buffering, client, "client.cpp,v 1.9 2003/12/24 17:46:34 bala Exp")
const char *server_ior = "file://server.ior";
const char *admin_ior = "file://admin.ior";
int iterations = 200;
int run_message_count_test = 0;
int run_timeout_test = 0;
int run_timeout_reactive_test = 0;
int run_buffer_size_test = 0;
const int PAYLOAD_LENGTH = 1024;
const int BUFFERED_MESSAGES_COUNT = 50;
const int TIMEOUT_MILLISECONDS = 50;
const int BUFFER_SIZE = 64 * PAYLOAD_LENGTH;
/// Check that no more than 10% of the messages are not sent.
const double LIVENESS_TOLERANCE = 0.9;
/// Limit the depth of the liveness test, avoid blowing up the stack
/// on the server
const int LIVENESS_MAX_DEPTH = 256;
/// Factor in GIOP overhead in the buffer size test
const double GIOP_OVERHEAD = 0.9;
int
parse_args (int argc, char *argv[])
{
ACE_Get_Opt get_opts (argc, argv, "k:a:i:ctbr");
int c;
while ((c = get_opts ()) != -1)
switch (c)
{
case 'k':
server_ior = get_opts.opt_arg ();
break;
case 'a':
admin_ior = get_opts.opt_arg ();
break;
case 'i':
iterations = ACE_OS::atoi (get_opts.opt_arg ());
break;
case 'c':
run_message_count_test = 1;
break;
case 't':
run_timeout_test = 1;
break;
case 'b':
run_buffer_size_test = 1;
break;
case 'r':
run_timeout_reactive_test = 1;
break;
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
"usage: %s "
"-k <server_ior> "
"-a <admin_ior> "
"-i <iterations> "
"<-c|-t|-b|-r> "
"\n",
argv [0]),
-1);
}
// Indicates sucessful parsing of the command line
return 0;
}
int
run_message_count (CORBA::ORB_ptr orb,
Test::AMI_Buffering_ptr ami_buffering,
Test::AMI_Buffering_Admin_ptr ami_buffering_admin
ACE_ENV_ARG_DECL);
int
run_timeout (CORBA::ORB_ptr orb,
Test::AMI_Buffering_ptr ami_buffering,
Test::AMI_Buffering_Admin_ptr ami_buffering_admin
ACE_ENV_ARG_DECL);
int
run_timeout_reactive (CORBA::ORB_ptr orb,
Test::AMI_Buffering_ptr oneway_buffering,
Test::AMI_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL);
int
run_buffer_size (CORBA::ORB_ptr orb,
Test::AMI_Buffering_ptr ami_buffering,
Test::AMI_Buffering_Admin_ptr ami_buffering_admin
ACE_ENV_ARG_DECL);
int
main (int argc, char *argv[])
{
int test_failed = 0;
ACE_TRY_NEW_ENV
{
CORBA::ORB_var orb =
CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
CORBA::Object_var poa_object =
orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
PortableServer::POA_var root_poa =
PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (CORBA::is_nil (root_poa.in ()))
ACE_ERROR_RETURN ((LM_ERROR,
" (%P|%t) Panic: nil RootPOA\n"),
1);
PortableServer::POAManager_var poa_manager =
root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
if (parse_args (argc, argv) != 0)
return 1;
CORBA::Object_var tmp =
orb->string_to_object(server_ior ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
Test::AMI_Buffering_var ami_buffering =
Test::AMI_Buffering::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (CORBA::is_nil (ami_buffering.in ()))
{
ACE_ERROR_RETURN ((LM_DEBUG,
"Nil Test::AMI_Buffering reference <%s>\n",
server_ior),
1);
}
tmp =
orb->string_to_object(admin_ior ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
Test::AMI_Buffering_Admin_var ami_buffering_admin =
Test::AMI_Buffering_Admin::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (CORBA::is_nil (ami_buffering_admin.in ()))
{
ACE_ERROR_RETURN ((LM_DEBUG,
"Nil Test::AMI_Buffering_Admin reference <%s>\n",
admin_ior),
1);
}
Client_Task client_task (orb.in ());
if (client_task.activate (THR_NEW_LWP | THR_JOINABLE) == -1)
{
ACE_ERROR ((LM_ERROR, "Error activating client task\n"));
}
if (run_message_count_test)
{
ACE_DEBUG ((LM_DEBUG,
"Running message count flushing test\n"));
test_failed =
run_message_count (orb.in (),
ami_buffering.in (),
ami_buffering_admin.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
else if (run_timeout_test)
{
ACE_DEBUG ((LM_DEBUG,
"Running timeout flushing test\n"));
test_failed =
run_timeout (orb.in (),
ami_buffering.in (),
ami_buffering_admin.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
else if (run_timeout_reactive_test)
{
ACE_DEBUG ((LM_DEBUG,
"Running timeout (reactive) flushing test\n"));
test_failed =
run_timeout_reactive (orb.in (),
ami_buffering.in (),
ami_buffering_admin.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
else if (run_buffer_size_test)
{
ACE_DEBUG ((LM_DEBUG,
"Running buffer size flushing test\n"));
test_failed =
run_buffer_size (orb.in (),
ami_buffering.in (),
ami_buffering_admin.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
else
{
ACE_ERROR ((LM_ERROR,
"ERROR: No test was configured\n"));
}
client_task.terminate_loop ();
client_task.thr_mgr ()->wait ();
ami_buffering->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
ami_buffering_admin->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
root_poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Exception caught:");
return 1;
}
ACE_ENDTRY;
return test_failed;
}
int
configure_policies (CORBA::ORB_ptr orb,
const TAO::BufferingConstraint &buffering_constraint,
Test::AMI_Buffering_ptr ami_buffering,
Test::AMI_Buffering_out flusher
ACE_ENV_ARG_DECL)
{
CORBA::Object_var object =
orb->resolve_initial_references ("PolicyCurrent" ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::PolicyCurrent_var policy_current =
CORBA::PolicyCurrent::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (CORBA::is_nil (policy_current.in ()))
{
ACE_ERROR ((LM_ERROR, "ERROR: Nil policy current\n"));
return 1;
}
CORBA::Any scope_as_any;
scope_as_any <<= Messaging::SYNC_NONE;
CORBA::Any buffering_as_any;
buffering_as_any <<= buffering_constraint;
CORBA::PolicyList policies (2); policies.length (2);
policies[0] =
orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
scope_as_any
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policies[1] =
orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE,
buffering_as_any
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policy_current->set_policy_overrides (policies, CORBA::ADD_OVERRIDE
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policies[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policies[1]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
TAO::BufferingConstraint flush_constraint;
flush_constraint.mode = TAO::BUFFER_FLUSH;
flush_constraint.message_count = 0;
flush_constraint.message_bytes = 0;
flush_constraint.timeout = 0;
buffering_as_any <<= flush_constraint;
policies.length (1);
policies[0] =
orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE,
buffering_as_any
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
object =
ami_buffering->_set_policy_overrides (policies,
CORBA::ADD_OVERRIDE
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policies[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
flusher =
Test::AMI_Buffering::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
return 0;
}
void
sync_server (CORBA::ORB_ptr orb,
Test::AMI_Buffering_ptr flusher
ACE_ENV_ARG_DECL)
{
// Get back in sync with the server...
flusher->flush (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
flusher->sync (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
// Drain responses from the queue
ACE_Time_Value tv (0, 100000);
orb->run (tv ACE_ENV_ARG_PARAMETER);
ACE_CHECK;
}
int
run_liveness_test (CORBA::ORB_ptr orb,
Test::AMI_AMI_BufferingHandler_ptr reply_handler,
Test::AMI_Buffering_ptr ami_buffering,
Test::AMI_Buffering_ptr flusher,
Test::AMI_Buffering_Admin_ptr ami_buffering_admin
ACE_ENV_ARG_DECL)
{
ACE_DEBUG ((LM_DEBUG, ".... checking for liveness\n"));
int test_failed = 0;
// Get back in sync with the server...
sync_server (orb, flusher ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::ULong send_count =
ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
int liveness_test_iterations = int(send_count);
Test::Payload payload (PAYLOAD_LENGTH);
payload.length (PAYLOAD_LENGTH);
for (int j = 0; j != PAYLOAD_LENGTH; ++j)
payload[j] = CORBA::Octet(j % 256);
int depth = 0;
for (int i = 0; i != liveness_test_iterations; ++i)
{
ami_buffering->sendc_receive_data (reply_handler,
payload
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
send_count++;
CORBA::ULong receive_count =
ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
// Once the system has sent enough messages we don't
// expect it to fall too far behind, i.e. at least 90% of the
// messages should be delivered....
CORBA::ULong expected =
CORBA::ULong (LIVENESS_TOLERANCE * send_count);
if (receive_count < expected)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d "
"not enough messages received %u "
"expected %u\n",
i, receive_count, expected));
sync_server (orb, flusher ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
}
if (depth++ == LIVENESS_MAX_DEPTH)
{
sync_server (orb, flusher ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
depth = 0;
}
}
return test_failed;
}
int
run_message_count (CORBA::ORB_ptr orb,
Test::AMI_Buffering_ptr ami_buffering,
Test::AMI_Buffering_Admin_ptr ami_buffering_admin
ACE_ENV_ARG_DECL)
{
TAO::BufferingConstraint buffering_constraint;
buffering_constraint.mode = TAO::BUFFER_MESSAGE_COUNT;
buffering_constraint.message_count = BUFFERED_MESSAGES_COUNT;
buffering_constraint.message_bytes = 0;
buffering_constraint.timeout = 0;
Test::AMI_Buffering_var flusher;
int test_failed =
configure_policies (orb, buffering_constraint,
ami_buffering, flusher.out ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (test_failed != 0)
return test_failed;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?