client.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 889 行 · 第 1/2 页

CPP
889
字号
// client.cpp,v 1.9 2003/12/24 17:46:34 bala Exp

#include "Reply_Handler.h"
#include "Client_Task.h"
#include "tao/Messaging/Messaging.h"
#include "tao/TAOC.h"
#include "tao/TAOA.h"
#include "ace/Get_Opt.h"

ACE_RCSID(AMI_Buffering, client, "client.cpp,v 1.9 2003/12/24 17:46:34 bala Exp")

const char *server_ior = "file://server.ior";
const char *admin_ior = "file://admin.ior";
int iterations = 200;

int run_message_count_test = 0;
int run_timeout_test = 0;
int run_timeout_reactive_test = 0;
int run_buffer_size_test = 0;

const int PAYLOAD_LENGTH = 1024;
const int BUFFERED_MESSAGES_COUNT = 50;
const int TIMEOUT_MILLISECONDS = 50;
const int BUFFER_SIZE = 64 * PAYLOAD_LENGTH;

/// Check that no more than 10% of the messages are not sent.
const double LIVENESS_TOLERANCE = 0.9;

/// Limit the depth of the liveness test, avoid blowing up the stack
/// on the server
const int LIVENESS_MAX_DEPTH = 256;

/// Factor in GIOP overhead in the buffer size test
const double GIOP_OVERHEAD = 0.9;

int
parse_args (int argc, char *argv[])
{
  ACE_Get_Opt get_opts (argc, argv, "k:a:i:ctbr");
  int c;

  while ((c = get_opts ()) != -1)
    switch (c)
      {
      case 'k':
        server_ior = get_opts.opt_arg ();
        break;

      case 'a':
        admin_ior = get_opts.opt_arg ();
        break;

      case 'i':
        iterations = ACE_OS::atoi (get_opts.opt_arg ());
        break;

      case 'c':
        run_message_count_test = 1;
        break;

      case 't':
        run_timeout_test = 1;
        break;

      case 'b':
        run_buffer_size_test = 1;
        break;

      case 'r':
        run_timeout_reactive_test = 1;
        break;

      case '?':
      default:
        ACE_ERROR_RETURN ((LM_ERROR,
                           "usage:  %s "
                           "-k <server_ior> "
                           "-a <admin_ior> "
                           "-i <iterations> "
                           "<-c|-t|-b|-r> "
                           "\n",
                           argv [0]),
                          -1);
      }
  // Indicates sucessful parsing of the command line
  return 0;
}

int
run_message_count (CORBA::ORB_ptr orb,
                   Test::AMI_Buffering_ptr ami_buffering,
                   Test::AMI_Buffering_Admin_ptr ami_buffering_admin
                   ACE_ENV_ARG_DECL);
int
run_timeout (CORBA::ORB_ptr orb,
             Test::AMI_Buffering_ptr ami_buffering,
             Test::AMI_Buffering_Admin_ptr ami_buffering_admin
             ACE_ENV_ARG_DECL);

int
run_timeout_reactive (CORBA::ORB_ptr orb,
                      Test::AMI_Buffering_ptr oneway_buffering,
                      Test::AMI_Buffering_Admin_ptr oneway_buffering_admin
                      ACE_ENV_ARG_DECL);

int
run_buffer_size (CORBA::ORB_ptr orb,
                 Test::AMI_Buffering_ptr ami_buffering,
                 Test::AMI_Buffering_Admin_ptr ami_buffering_admin
                 ACE_ENV_ARG_DECL);

int
main (int argc, char *argv[])
{
  int test_failed = 0;
  ACE_TRY_NEW_ENV
    {
      CORBA::ORB_var orb =
        CORBA::ORB_init (argc, argv, "" ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      CORBA::Object_var poa_object =
        orb->resolve_initial_references("RootPOA" ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      PortableServer::POA_var root_poa =
        PortableServer::POA::_narrow (poa_object.in () ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      if (CORBA::is_nil (root_poa.in ()))
        ACE_ERROR_RETURN ((LM_ERROR,
                           " (%P|%t) Panic: nil RootPOA\n"),
                          1);

      PortableServer::POAManager_var poa_manager =
        root_poa->the_POAManager (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      poa_manager->activate (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      if (parse_args (argc, argv) != 0)
        return 1;

      CORBA::Object_var tmp =
        orb->string_to_object(server_ior ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      Test::AMI_Buffering_var ami_buffering =
        Test::AMI_Buffering::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      if (CORBA::is_nil (ami_buffering.in ()))
        {
          ACE_ERROR_RETURN ((LM_DEBUG,
                             "Nil Test::AMI_Buffering reference <%s>\n",
                             server_ior),
                            1);
        }

      tmp =
        orb->string_to_object(admin_ior ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      Test::AMI_Buffering_Admin_var ami_buffering_admin =
        Test::AMI_Buffering_Admin::_narrow(tmp.in () ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      if (CORBA::is_nil (ami_buffering_admin.in ()))
        {
          ACE_ERROR_RETURN ((LM_DEBUG,
                             "Nil Test::AMI_Buffering_Admin reference <%s>\n",
                             admin_ior),
                            1);
        }

      Client_Task client_task (orb.in ());
      if (client_task.activate (THR_NEW_LWP | THR_JOINABLE) == -1)
        {
          ACE_ERROR ((LM_ERROR, "Error activating client task\n"));
        }

      if (run_message_count_test)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "Running message count flushing test\n"));
          test_failed =
            run_message_count (orb.in (),
                               ami_buffering.in (),
                               ami_buffering_admin.in ()
                               ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }
      else if (run_timeout_test)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "Running timeout flushing test\n"));
          test_failed =
            run_timeout (orb.in (),
                         ami_buffering.in (),
                         ami_buffering_admin.in ()
                         ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }
      else if (run_timeout_reactive_test)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "Running timeout (reactive) flushing test\n"));
          test_failed =
            run_timeout_reactive (orb.in (),
                                  ami_buffering.in (),
                                  ami_buffering_admin.in ()
                                  ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }
      else if (run_buffer_size_test)
        {
          ACE_DEBUG ((LM_DEBUG,
                      "Running buffer size flushing test\n"));
          test_failed =
            run_buffer_size (orb.in (),
                             ami_buffering.in (),
                             ami_buffering_admin.in ()
                             ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }
      else
        {
          ACE_ERROR ((LM_ERROR,
                      "ERROR: No test was configured\n"));
        }

      client_task.terminate_loop ();

      client_task.thr_mgr ()->wait ();

      ami_buffering->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      ami_buffering_admin->shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      root_poa->destroy (1, 1 ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      orb->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "Exception caught:");
      return 1;
    }
  ACE_ENDTRY;

  return test_failed;
}

int
configure_policies (CORBA::ORB_ptr orb,
                    const TAO::BufferingConstraint &buffering_constraint,
                    Test::AMI_Buffering_ptr ami_buffering,
                    Test::AMI_Buffering_out flusher
                    ACE_ENV_ARG_DECL)
{
  CORBA::Object_var object =
    orb->resolve_initial_references ("PolicyCurrent" ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  CORBA::PolicyCurrent_var policy_current =
    CORBA::PolicyCurrent::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (CORBA::is_nil (policy_current.in ()))
    {
      ACE_ERROR ((LM_ERROR, "ERROR: Nil policy current\n"));
      return 1;
    }
  CORBA::Any scope_as_any;
  scope_as_any <<= Messaging::SYNC_NONE;

  CORBA::Any buffering_as_any;
  buffering_as_any <<= buffering_constraint;

  CORBA::PolicyList policies (2); policies.length (2);
  policies[0] =
    orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
                        scope_as_any
                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);
  policies[1] =
    orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE,
                        buffering_as_any
                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  policy_current->set_policy_overrides (policies, CORBA::ADD_OVERRIDE
                                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  policies[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);
  policies[1]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  TAO::BufferingConstraint flush_constraint;
  flush_constraint.mode = TAO::BUFFER_FLUSH;
  flush_constraint.message_count = 0;
  flush_constraint.message_bytes = 0;
  flush_constraint.timeout = 0;

  buffering_as_any <<= flush_constraint;
  policies.length (1);
  policies[0] =
    orb->create_policy (TAO::BUFFERING_CONSTRAINT_POLICY_TYPE,
                        buffering_as_any
                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  object =
    ami_buffering->_set_policy_overrides (policies,
                                             CORBA::ADD_OVERRIDE
                                             ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  policies[0]->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  flusher =
    Test::AMI_Buffering::_narrow (object.in () ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  return 0;
}

void
sync_server (CORBA::ORB_ptr orb,
             Test::AMI_Buffering_ptr flusher
             ACE_ENV_ARG_DECL)
{
  // Get back in sync with the server...
  flusher->flush (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
  flusher->sync (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  // Drain responses from the queue
  ACE_Time_Value tv (0, 100000);
  orb->run (tv ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

int
run_liveness_test (CORBA::ORB_ptr orb,
                   Test::AMI_AMI_BufferingHandler_ptr reply_handler,
                   Test::AMI_Buffering_ptr ami_buffering,
                   Test::AMI_Buffering_ptr flusher,
                   Test::AMI_Buffering_Admin_ptr ami_buffering_admin
                   ACE_ENV_ARG_DECL)
{
  ACE_DEBUG ((LM_DEBUG, ".... checking for liveness\n"));
  int test_failed = 0;

  // Get back in sync with the server...
  sync_server (orb, flusher ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  CORBA::ULong send_count =
    ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  int liveness_test_iterations = int(send_count);

  Test::Payload payload (PAYLOAD_LENGTH);
  payload.length (PAYLOAD_LENGTH);
  for (int j = 0; j != PAYLOAD_LENGTH; ++j)
    payload[j] = CORBA::Octet(j % 256);

  int depth = 0;
  for (int i = 0; i != liveness_test_iterations; ++i)
    {
      ami_buffering->sendc_receive_data (reply_handler,
                                         payload
                                         ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);
      send_count++;

      CORBA::ULong receive_count =
        ami_buffering_admin->request_count (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK_RETURN (-1);

      // Once the system has sent enough messages we don't
      // expect it to fall too far behind, i.e. at least 90% of the
      // messages should be delivered....
      CORBA::ULong expected =
        CORBA::ULong (LIVENESS_TOLERANCE * send_count);

      if (receive_count < expected)
        {
          test_failed = 1;
          ACE_DEBUG ((LM_DEBUG,
                      "DEBUG: Iteration %d "
                      "not enough messages received %u "
                      "expected %u\n",
                      i, receive_count, expected));

          sync_server (orb, flusher ACE_ENV_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);
        }

      if (depth++ == LIVENESS_MAX_DEPTH)
        {
          sync_server (orb, flusher ACE_ENV_ARG_PARAMETER);
          ACE_CHECK_RETURN (-1);

          depth = 0;
        }
    }

  return test_failed;
}

int
run_message_count (CORBA::ORB_ptr orb,
                   Test::AMI_Buffering_ptr ami_buffering,
                   Test::AMI_Buffering_Admin_ptr ami_buffering_admin
                   ACE_ENV_ARG_DECL)
{
  TAO::BufferingConstraint buffering_constraint;
  buffering_constraint.mode = TAO::BUFFER_MESSAGE_COUNT;
  buffering_constraint.message_count = BUFFERED_MESSAGES_COUNT;
  buffering_constraint.message_bytes = 0;
  buffering_constraint.timeout = 0;

  Test::AMI_Buffering_var flusher;
  int test_failed =
    configure_policies (orb, buffering_constraint,
                        ami_buffering, flusher.out ()
                        ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (-1);

  if (test_failed != 0)
    return test_failed;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?