client.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 792 行 · 第 1/2 页

CPP
792
字号
                        oneway_buffering, flusher.out ()
                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (test_failed != 0)
    return test_failed;

  Test::Payload payload (PAYLOAD_LENGTH);
  payload.length (PAYLOAD_LENGTH);
  for (int j = 0; j != PAYLOAD_LENGTH; ++j)
    payload[j] = CORBA::Octet(j % 256);

  CORBA::ULong send_count = 0;
  for (int i = 0; i != iterations; ++i)
    {
      sync_server (flusher.in () ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      CORBA::ULong initial_receive_count =
        oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      if (initial_receive_count != send_count)
        {
          test_failed = 1;
          ACE_DEBUG ((LM_DEBUG,
                      "DEBUG: Iteration %d message lost (%u != %u)\n",
                      i, initial_receive_count, send_count));
        }

      while (1)
        {
          oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);
          send_count++;

          CORBA::ULong receive_count =
            oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);

          CORBA::ULong iteration_count =
            send_count - initial_receive_count;
          if (receive_count != initial_receive_count)
            {
              if (iteration_count < CORBA::ULong(BUFFERED_MESSAGES_COUNT))
                {
                  test_failed = 1;
                  ACE_DEBUG ((LM_DEBUG,
                              "DEBUG: Iteration %d flush before "
                              "message count reached. "
                              "Iteration count = %u, Threshold = %u\n",
                              i,
                              iteration_count, BUFFERED_MESSAGES_COUNT));
                }
              break;
            }

          if (iteration_count > 3 * BUFFERED_MESSAGES_COUNT)
            {
              test_failed = 1;
              ACE_DEBUG ((LM_DEBUG,
                          "DEBUG: Iteration %d no flush past "
                          "message count threshold. "
                          "Iteration count = %u, Threshold = %u\n",
                          i,
                          iteration_count, BUFFERED_MESSAGES_COUNT));
              break;
            }
        }
    }

  int liveness_test_failed =
    run_liveness_test (oneway_buffering,
                       flusher.in (),
                       oneway_buffering_admin
                       ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (liveness_test_failed)
    test_failed = 1;

  return test_failed;
}

int
run_timeout (CORBA::ORB_ptr orb,
              Test::Oneway_Buffering_ptr oneway_buffering,
              Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
              ACE_ENV_ARG_DECL)
{
  TAO::BufferingConstraint buffering_constraint;
  buffering_constraint.mode = TAO::BUFFER_TIMEOUT;
  buffering_constraint.message_count = 0;
  buffering_constraint.message_bytes = 0;
  buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000;

  Test::Oneway_Buffering_var flusher;
  int test_failed =
    configure_policies (orb, buffering_constraint,
                        oneway_buffering, flusher.out ()
                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (test_failed != 0)
    return test_failed;

  Test::Payload payload (PAYLOAD_LENGTH);
  payload.length (PAYLOAD_LENGTH);
  for (int j = 0; j != PAYLOAD_LENGTH; ++j)
    payload[j] = CORBA::Octet(j % 256);

  CORBA::ULong send_count = 0;
  for (int i = 0; i != iterations; ++i)
    {
      sync_server (flusher.in () ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      CORBA::ULong initial_receive_count =
        oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      if (initial_receive_count != send_count)
        {
          test_failed = 1;
          ACE_DEBUG ((LM_DEBUG,
                      "DEBUG: Iteration %d message lost (%u != %u)\n",
                      i, initial_receive_count, send_count));
        }

      ACE_Time_Value start = ACE_OS::gettimeofday ();
      while (1)
        {
          oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);
          send_count++;

          CORBA::ULong receive_count =
            oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);

          ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
          if (receive_count != initial_receive_count)
            {
              if (elapsed.msec () < TIMEOUT_MILLISECONDS)
                {
                  test_failed = 1;
                  ACE_DEBUG ((LM_DEBUG,
                              "DEBUG: Iteration %d flush before "
                              "timeout expired. "
                              "Elapsed = %d, Timeout = %d msecs\n",
                              i,
                              elapsed.msec (), TIMEOUT_MILLISECONDS));
                }
              // terminate the while loop.
              break;
            }

          if (elapsed.msec () > 3 * TIMEOUT_MILLISECONDS)
            {
              test_failed = 1;
              ACE_DEBUG ((LM_DEBUG,
                          "DEBUG: Iteration %d no flush past "
                          "timeout threshold. "
                          "Elapsed = %d, Timeout = %d msecs\n",
                          i,
                          elapsed.msec (), TIMEOUT_MILLISECONDS));
              break;
            }
        }
    }

  int liveness_test_failed =
    run_liveness_test (oneway_buffering,
                       flusher.in (),
                       oneway_buffering_admin
                       ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (liveness_test_failed)
    test_failed = 1;


  return test_failed;
}

int
run_timeout_reactive (CORBA::ORB_ptr orb,
                      Test::Oneway_Buffering_ptr oneway_buffering,
                      Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
                      ACE_ENV_ARG_DECL)
{
  TAO::BufferingConstraint buffering_constraint;
  buffering_constraint.mode = TAO::BUFFER_TIMEOUT;
  buffering_constraint.message_count = 0;
  buffering_constraint.message_bytes = 0;
  buffering_constraint.timeout = TIMEOUT_MILLISECONDS * 10000;

  Test::Oneway_Buffering_var flusher;
  int test_failed =
    configure_policies (orb, buffering_constraint,
                        oneway_buffering, flusher.out ()
                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (test_failed != 0)
    return test_failed;

  Test::Payload payload (PAYLOAD_LENGTH);
  payload.length (PAYLOAD_LENGTH);
  for (int j = 0; j != PAYLOAD_LENGTH; ++j)
    payload[j] = CORBA::Octet(j % 256);

  CORBA::ULong send_count = 0;
  for (int i = 0; i != iterations; ++i)
    {
      sync_server (flusher.in () ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      CORBA::ULong initial_receive_count =
        oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      if (initial_receive_count != send_count)
        {
          test_failed = 1;
          ACE_DEBUG ((LM_DEBUG,
                      "DEBUG: Iteration %d message lost (%u != %u)\n",
                      i, initial_receive_count, send_count));
        }

      ACE_Time_Value start = ACE_OS::gettimeofday ();
      for (int j = 0; j != 20; ++j)
        {
          oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);
          send_count++;
        }
      while (1)
        {
          CORBA::ULong receive_count =
            oneway_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);

          ACE_Time_Value sleep (0, 10000);
          orb->run (sleep ACE_ENV_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);

          ACE_Time_Value elapsed = ACE_OS::gettimeofday () - start;
          if (receive_count != initial_receive_count)
            {
              if (elapsed.msec () < TIMEOUT_MILLISECONDS)
                {
                  test_failed = 1;
                  ACE_DEBUG ((LM_DEBUG,
                              "DEBUG: Iteration %d flush before "
                              "timeout expired. "
                              "Elapsed = %d, Timeout = %d msecs\n",
                              i,
                              elapsed.msec (), TIMEOUT_MILLISECONDS));
                }
              // terminate the while loop.
              break;
            }

          if (elapsed.msec () > 3 * TIMEOUT_MILLISECONDS)
            {
              test_failed = 1;
              ACE_DEBUG ((LM_DEBUG,
                          "DEBUG: Iteration %d no flush past "
                          "timeout threshold. "
                          "Elapsed = %d, Timeout = %d msecs\n",
                          i,
                          elapsed.msec (), TIMEOUT_MILLISECONDS));
              break;
            }
        }
    }

  int liveness_test_failed =
    run_liveness_test (oneway_buffering,
                       flusher.in (),
                       oneway_buffering_admin
                       ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (liveness_test_failed)
    test_failed = 1;


  return test_failed;
}

int
run_buffer_size (CORBA::ORB_ptr orb,
                 Test::Oneway_Buffering_ptr oneway_buffering,
                 Test::Oneway_Buffering_Admin_ptr oneway_buffering_admin
                 ACE_ENV_ARG_DECL)
{
  TAO::BufferingConstraint buffering_constraint;
  buffering_constraint.mode = TAO::BUFFER_MESSAGE_BYTES;
  buffering_constraint.message_count = 0;
  buffering_constraint.message_bytes = BUFFER_SIZE;
  buffering_constraint.timeout = 0;

  Test::Oneway_Buffering_var flusher;
  int test_failed =
    configure_policies (orb, buffering_constraint,
                        oneway_buffering, flusher.out ()
                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (test_failed != 0)
    return test_failed;

  Test::Payload payload (PAYLOAD_LENGTH);
  payload.length (PAYLOAD_LENGTH);
  for (int j = 0; j != PAYLOAD_LENGTH; ++j)
    payload[j] = CORBA::Octet(j % 256);

  CORBA::ULong bytes_sent = 0;
  for (int i = 0; i != iterations; ++i)
    {
      sync_server (flusher.in () ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      CORBA::ULong initial_bytes_received =
        oneway_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      if (initial_bytes_received != bytes_sent)
        {
          test_failed = 1;
          ACE_DEBUG ((LM_DEBUG,
                      "DEBUG: Iteration %d data lost (%u != %u)\n",
                      i, initial_bytes_received, bytes_sent));
        }

      while (1)
        {
          oneway_buffering->receive_data (payload ACE_ENV_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);
          bytes_sent += PAYLOAD_LENGTH;

          CORBA::ULong bytes_received =
            oneway_buffering_admin->bytes_received_count (ACE_ENV_SINGLE_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);

          CORBA::ULong payload_delta =
            bytes_sent - initial_bytes_received;
          if (bytes_received != initial_bytes_received)
            {
              // The queue has been flushed, check that enough data
              // has been sent.  The check cannot be precise because
              // the ORB counts the GIOP message overhead, in this
              // test we assume the overhead to be less than 10%

              if (payload_delta < CORBA::ULong (GIOP_OVERHEAD * BUFFER_SIZE))
                {
                  test_failed = 1;
                  ACE_DEBUG ((LM_DEBUG,
                              "DEBUG: Iteration %d flush before "
                              "minimum buffer size was reached. "
                              "Sent = %u, Minimum buffer = %u bytes\n",
                              i,
                              payload_delta, BUFFER_SIZE));
                }
              break;
            }

          if (payload_delta > 3 * BUFFER_SIZE)
            {
              test_failed = 1;
              ACE_DEBUG ((LM_DEBUG,
                          "DEBUG: Iteration %d no flush past "
                          "buffer size threshold. "
                          "Sent = %u, Minimum buffer = %u bytes\n",
                          i,
                          payload_delta, BUFFER_SIZE));
              break;
            }
        }
    }

  int liveness_test_failed =
    run_liveness_test (oneway_buffering,
                       flusher.in (),
                       oneway_buffering_admin
                       ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (liveness_test_failed)
    test_failed = 1;

  return test_failed;
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?