client.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 792 行 · 第 1/2 页
CPP
792 行
// client.cpp,v 1.12 2003/12/24 17:46:34 bala Exp
#include "TestC.h"
#include "tao/Messaging/Messaging.h"
#include "tao/TAOC.h"
#include "tao/TAOA.h"
#include "ace/Get_Opt.h"
#include "ace/OS_NS_sys_time.h"
ACE_RCSID(Oneway_Buffering, client, "client.cpp,v 1.12 2003/12/24 17:46:34 bala Exp")
const char *server_ior = "file://server.ior";
const char *admin_ior = "file://admin.ior";
int iterations = 200;
int run_message_count_test = 0;
int run_timeout_test = 0;
int run_timeout_reactive_test = 0;
int run_buffer_size_test = 0;
const int PAYLOAD_LENGTH = 1024;
const int BUFFERED_MESSAGES_COUNT = 50;
const int TIMEOUT_MILLISECONDS = 50;
const int BUFFER_SIZE = 64 * PAYLOAD_LENGTH;
/// Check that no more than 10% of the messages are not sent.
const double LIVENESS_TOLERANCE = 0.9;
/// Limit the depth of the liveness test, avoid blowing up the stack
/// on the server
const int LIVENESS_MAX_DEPTH = 256;
/// Factor in GIOP overhead in the buffer size test
const double GIOP_OVERHEAD = 0.9;
int
parse_args (int argc, char *argv[])
{
ACE_Get_Opt get_opts (argc, argv, "k:a:i:ctbr");
int c;
while ((c = get_opts ()) != -1)
switch (c)
{
case 'k':
server_ior = get_opts.opt_arg ();
break;
case 'a':
admin_ior = get_opts.opt_arg ();
break;
case 'i':
iterations = ACE_OS::atoi (get_opts.opt_arg ());
break;
case 'c':
run_message_count_test = 1;
break;
case 't':
run_timeout_test = 1;
break;
case 'b':
run_buffer_size_test = 1;
break;
case 'r':
run_timeout_reactive_test = 1;
break;
case '?':
default:
ACE_ERROR_RETURN ((LM_ERROR,
"usage: %s "
"-k <server_ior> "
"-a <admin_ior> "
"-i <iterations> "
"<-c|-t|-b|-r> "
"\n",
argv [0]),
-1);
}
// Indicates sucessful parsing of the command line
return 0;
}
int
run_message_count (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL);
int
run_timeout (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL);
int
run_timeout_reactive (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL);
int
run_buffer_size (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL);
int
main (int argc, char *argv[])
{
int test_failed = 0;
ACE_TRY_NEW_ENV
{
CORBA::ORB_var orb =
CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (parse_args (argc, argv) != 0)
return 1;
CORBA::Object_var tmp =
orb->string_to_object(server_ior ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
Test::Oneway_Buffering_var oneway_buffering =
Test::Oneway_Buffering::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (CORBA::is_nil (oneway_buffering.in ()))
{
ACE_ERROR_RETURN ((LM_DEBUG,
"Nil Test::Oneway_Buffering reference <%s>\n",
server_ior),
1);
}
tmp =
orb->string_to_object(admin_ior ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
Test::Oneway_Buffering_Admin_var oneway_buffering_admin =
Test::Oneway_Buffering_Admin::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
if (CORBA::is_nil (oneway_buffering_admin.in ()))
{
ACE_ERROR_RETURN ((LM_DEBUG,
"Nil Test::Oneway_Buffering_Admin reference <%s>\n",
admin_ior),
1);
}
if (run_message_count_test)
{
ACE_DEBUG ((LM_DEBUG,
"Running message count flushing test\n"));
test_failed =
run_message_count (orb.in (),
oneway_buffering.in (),
oneway_buffering_admin.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
else if (run_timeout_test)
{
ACE_DEBUG ((LM_DEBUG,
"Running timeout flushing test\n"));
test_failed =
run_timeout (orb.in (),
oneway_buffering.in (),
oneway_buffering_admin.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
else if (run_timeout_reactive_test)
{
ACE_DEBUG ((LM_DEBUG,
"Running timeout (reactive) flushing test\n"));
test_failed =
run_timeout_reactive (orb.in (),
oneway_buffering.in (),
oneway_buffering_admin.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
else if (run_buffer_size_test)
{
ACE_DEBUG ((LM_DEBUG,
"Running buffer size flushing test\n"));
test_failed =
run_buffer_size (orb.in (),
oneway_buffering.in (),
oneway_buffering_admin.in ()
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
else
{
ACE_ERROR ((LM_ERROR,
"ERROR: No test was configured\n"));
}
oneway_buffering->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
oneway_buffering_admin->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Exception caught:");
return 1;
}
ACE_ENDTRY;
return test_failed;
}
int
configure_policies (CORBA::ORB_ptr orb,
const TAO::BufferingConstraint &buffering_constraint,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_out flusher
ACE_ENV_ARG_DECL)
{
CORBA::Object_var object =
orb->resolve_initial_references ("PolicyCurrent" ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::PolicyCurrent_var policy_current =
CORBA::PolicyCurrent::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (CORBA::is_nil (policy_current.in ()))
{
ACE_ERROR ((LM_ERROR, "ERROR: Nil policy current\n"));
return 1;
}
CORBA::Any scope_as_any;
scope_as_any <<= Messaging::SYNC_NONE;
CORBA::Any buffering_as_any;
buffering_as_any <<= buffering_constraint;
CORBA::PolicyList policies (2); policies.length (2);
policies[0] =
orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
scope_as_any
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policies[1] =
orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE,
buffering_as_any
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policy_current->set_policy_overrides (policies, CORBA::ADD_OVERRIDE
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policies[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policies[1]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
TAO::BufferingConstraint flush_constraint;
flush_constraint.mode = TAO::BUFFER_FLUSH;
flush_constraint.message_count = 0;
flush_constraint.message_bytes = 0;
flush_constraint.timeout = 0;
buffering_as_any <<= flush_constraint;
policies.length (1);
policies[0] =
orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE,
buffering_as_any
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
object =
oneway_buffering->_set_policy_overrides (policies,
CORBA::ADD_OVERRIDE
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
policies[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
flusher =
Test::Oneway_Buffering::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
return 0;
}
void
sync_server (Test::Oneway_Buffering_ptr flusher
ACE_ENV_ARG_DECL)
{
// Get back in sync with the server...
flusher->flush (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
flusher->sync (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK;
}
int
run_liveness_test (Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_ptr flusher,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL)
{
ACE_DEBUG ((LM_DEBUG, ".... checking for liveness\n"));
int test_failed = 0;
sync_server (flusher ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::ULong send_count =
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
int liveness_test_iterations = int(send_count);
Test::Payload payload (PAYLOAD_LENGTH);
payload.length (PAYLOAD_LENGTH);
for (int j = 0; j != PAYLOAD_LENGTH; ++j)
payload[j] = CORBA::Octet(j % 256);
int depth = 0;
for (int i = 0; i != liveness_test_iterations; ++i)
{
oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
send_count++;
CORBA::ULong receive_count =
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
// Once the system has sent enough messages we don't
// expect it to fall too far behind, i.e. at least 90% of the
// messages should be delivered....
CORBA::ULong expected =
CORBA::ULong (LIVENESS_TOLERANCE * send_count);
if (receive_count < expected)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d "
"not enough messages received %u "
"expected %u\n",
i, receive_count, expected));
sync_server (flusher ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
}
if (depth++ == LIVENESS_MAX_DEPTH)
{
sync_server (flusher ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
depth = 0;
}
}
return test_failed;
}
int
run_message_count (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL)
{
TAO::BufferingConstraint buffering_constraint;
buffering_constraint.mode = TAO::BUFFER_MESSAGE_COUNT;
buffering_constraint.message_count = BUFFERED_MESSAGES_COUNT;
buffering_constraint.message_bytes = 0;
buffering_constraint.timeout = 0;
Test::Oneway_Buffering_var flusher;
int test_failed =
configure_policies (orb, buffering_constraint,
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?