client.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 792 行 · 第 1/2 页
CPP
792 行
oneway_buffering, flusher.out ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (test_failed != 0)
return test_failed;
Test::Payload payload (PAYLOAD_LENGTH);
payload.length (PAYLOAD_LENGTH);
for (int j = 0; j != PAYLOAD_LENGTH; ++j)
payload[j] = CORBA::Octet(j % 256);
CORBA::ULong send_count = 0;
for (int i = 0; i != iterations; ++i)
{
sync_server (flusher.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::ULong initial_receive_count =
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (initial_receive_count != send_count)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d message lost (%u != %u)\n",
i, initial_receive_count, send_count));
}
while (1)
{
oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
send_count++;
CORBA::ULong receive_count =
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::ULong iteration_count =
send_count - initial_receive_count;
if (receive_count != initial_receive_count)
{
if (iteration_count < CORBA::ULong(BUFFERED_MESSAGES_COUNT))
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d flush before "
"message count reached. "
"Iteration count = %u, Threshold = %u\n",
i,
iteration_count, BUFFERED_MESSAGES_COUNT));
}
break;
}
if (iteration_count > 3 * BUFFERED_MESSAGES_COUNT)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d no flush past "
"message count threshold. "
"Iteration count = %u, Threshold = %u\n",
i,
iteration_count, BUFFERED_MESSAGES_COUNT));
break;
}
}
}
int liveness_test_failed =
run_liveness_test (oneway_buffering,
flusher.in (),
oneway_buffering_admin
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (liveness_test_failed)
test_failed = 1;
return test_failed;
}
int
run_timeout (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL)
{
TAO::BufferingConstraint buffering_constraint;
buffering_constraint.mode = TAO::BUFFER_TIMEOUT;
buffering_constraint.message_count = 0;
buffering_constraint.message_bytes = 0;
buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000;
Test::Oneway_Buffering_var flusher;
int test_failed =
configure_policies (orb, buffering_constraint,
oneway_buffering, flusher.out ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (test_failed != 0)
return test_failed;
Test::Payload payload (PAYLOAD_LENGTH);
payload.length (PAYLOAD_LENGTH);
for (int j = 0; j != PAYLOAD_LENGTH; ++j)
payload[j] = CORBA::Octet(j % 256);
CORBA::ULong send_count = 0;
for (int i = 0; i != iterations; ++i)
{
sync_server (flusher.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::ULong initial_receive_count =
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (initial_receive_count != send_count)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d message lost (%u != %u)\n",
i, initial_receive_count, send_count));
}
ACE_Time_Value start = ACE_OS::gettimeofday ();
while (1)
{
oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
send_count++;
CORBA::ULong receive_count =
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
if (receive_count != initial_receive_count)
{
if (elapsed.msec () < TIMEOUT_MILLISECONDS)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d flush before "
"timeout expired. "
"Elapsed = %d, Timeout = %d msecs\n",
i,
elapsed.msec (), TIMEOUT_MILLISECONDS));
}
// terminate the while loop.
break;
}
if (elapsed.msec () > 3 * TIMEOUT_MILLISECONDS)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d no flush past "
"timeout threshold. "
"Elapsed = %d, Timeout = %d msecs\n",
i,
elapsed.msec (), TIMEOUT_MILLISECONDS));
break;
}
}
}
int liveness_test_failed =
run_liveness_test (oneway_buffering,
flusher.in (),
oneway_buffering_admin
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (liveness_test_failed)
test_failed = 1;
return test_failed;
}
int
run_timeout_reactive (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL)
{
TAO::BufferingConstraint buffering_constraint;
buffering_constraint.mode = TAO::BUFFER_TIMEOUT;
buffering_constraint.message_count = 0;
buffering_constraint.message_bytes = 0;
buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000;
Test::Oneway_Buffering_var flusher;
int test_failed =
configure_policies (orb, buffering_constraint,
oneway_buffering, flusher.out ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (test_failed != 0)
return test_failed;
Test::Payload payload (PAYLOAD_LENGTH);
payload.length (PAYLOAD_LENGTH);
for (int j = 0; j != PAYLOAD_LENGTH; ++j)
payload[j] = CORBA::Octet(j % 256);
CORBA::ULong send_count = 0;
for (int i = 0; i != iterations; ++i)
{
sync_server (flusher.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::ULong initial_receive_count =
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (initial_receive_count != send_count)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d message lost (%u != %u)\n",
i, initial_receive_count, send_count));
}
ACE_Time_Value start = ACE_OS::gettimeofday ();
for (int j = 0; j != 20; ++j)
{
oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
send_count++;
}
while (1)
{
CORBA::ULong receive_count =
oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_Time_Value sleep (0, 10000);
orb->run (sleep ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
if (receive_count != initial_receive_count)
{
if (elapsed.msec () < TIMEOUT_MILLISECONDS)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d flush before "
"timeout expired. "
"Elapsed = %d, Timeout = %d msecs\n",
i,
elapsed.msec (), TIMEOUT_MILLISECONDS));
}
// terminate the while loop.
break;
}
if (elapsed.msec () > 3 * TIMEOUT_MILLISECONDS)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d no flush past "
"timeout threshold. "
"Elapsed = %d, Timeout = %d msecs\n",
i,
elapsed.msec (), TIMEOUT_MILLISECONDS));
break;
}
}
}
int liveness_test_failed =
run_liveness_test (oneway_buffering,
flusher.in (),
oneway_buffering_admin
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (liveness_test_failed)
test_failed = 1;
return test_failed;
}
int
run_buffer_size (CORBA::ORB_ptr orb,
Test::Oneway_Buffering_ptr oneway_buffering,
Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
ACE_ENV_ARG_DECL)
{
TAO::BufferingConstraint buffering_constraint;
buffering_constraint.mode = TAO::BUFFER_MESSAGE_BYTES;
buffering_constraint.message_count = 0;
buffering_constraint.message_bytes = BUFFER_SIZE;
buffering_constraint.timeout = 0;
Test::Oneway_Buffering_var flusher;
int test_failed =
configure_policies (orb, buffering_constraint,
oneway_buffering, flusher.out ()
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (test_failed != 0)
return test_failed;
Test::Payload payload (PAYLOAD_LENGTH);
payload.length (PAYLOAD_LENGTH);
for (int j = 0; j != PAYLOAD_LENGTH; ++j)
payload[j] = CORBA::Octet(j % 256);
CORBA::ULong bytes_sent = 0;
for (int i = 0; i != iterations; ++i)
{
sync_server (flusher.in () ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::ULong initial_bytes_received =
oneway_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (initial_bytes_received != bytes_sent)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d data lost (%u != %u)\n",
i, initial_bytes_received, bytes_sent));
}
while (1)
{
oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
bytes_sent += PAYLOAD_LENGTH;
CORBA::ULong bytes_received =
oneway_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
CORBA::ULong payload_delta =
bytes_sent - initial_bytes_received;
if (bytes_received != initial_bytes_received)
{
// The queue has been flushed, check that enough data
// has been sent. The check cannot be precise because
// the ORB counts the GIOP message overhead, in this
// test we assume the overhead to be less than 10%
if (payload_delta < CORBA::ULong (GIOP_OVERHEAD * BUFFER_SIZE))
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d flush before "
"minimum buffer size was reached. "
"Sent = %u, Minimum buffer = %u bytes\n",
i,
payload_delta, BUFFER_SIZE));
}
break;
}
if (payload_delta > 3 * BUFFER_SIZE)
{
test_failed = 1;
ACE_DEBUG ((LM_DEBUG,
"DEBUG: Iteration %d no flush past "
"buffer size threshold. "
"Sent = %u, Minimum buffer = %u bytes\n",
i,
payload_delta, BUFFER_SIZE));
break;
}
}
}
int liveness_test_failed =
run_liveness_test (oneway_buffering,
flusher.in (),
oneway_buffering_admin
ACE_ENV_ARG_PARAMETER);
ACE_CHECK_RETURN (-1);
if (liveness_test_failed)
test_failed = 1;
return test_failed;
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?