collocation.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 991 行 · 第 1/2 页

CPP
991
字号
                                      this->static_threads_,
                                      this->dynamic_threads_,
                                      default_thread_priority,
                                      this->allow_request_buffering_,
                                      this->max_buffered_requests_,
                                      this->max_request_buffer_size_
                                      ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::Policy_var threadpool_policy =
    this->rt_orb_->create_threadpool_policy (threadpool_id
                                             ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::Policy_var priority_model_policy =
    this->rt_orb_->create_priority_model_policy (priority_model,
                                                 default_thread_priority
                                                 ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::PolicyList policies;

  policies.length (policies.length () + 1);
  policies[policies.length () - 1] =
    threadpool_policy;

  if (set_priority_model)
    {
      policies.length (policies.length () + 1);
      policies[policies.length () - 1] =
        priority_model_policy;
    }

  PortableServer::POA_var poa =
    this->root_poa_->create_POA (poa_name,
                                 this->poa_manager_.in (),
                                 policies
                                 ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  RTPortableServer::POA_var rt_poa =
    RTPortableServer::POA::_narrow (poa.in ()
                                    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  test_i *servant1 = 0;
  ACE_NEW_THROW_EX (servant1,
                    test_i (this->orb_.in (),
                            poa.in (),
                            this->tests_
                            ACE_ENV_ARG_PARAMETER),
                    CORBA::NO_MEMORY ());
  ACE_CHECK;

  servant1->in_lane (0);

  PortableServer::ObjectId_var id1;

  if (set_priority_model)
    {
      if (priority_model == RTCORBA::CLIENT_PROPAGATED)
        {
          servant1->invocation_pool_and_lane (1, 0);
          servant1->client_propagated (1);

          id1 =
            rt_poa->activate_object (servant1
                                     ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;
        }
      else
        {
          test_i *servant2 = 0;

          ACE_NEW_THROW_EX (servant2,
                            test_i (this->orb_.in (),
                                    poa.in (),
                                    this->tests_
                                    ACE_ENV_ARG_PARAMETER),
                            CORBA::NO_MEMORY ());
          ACE_CHECK;

          servant2->in_lane (0);

          PortableServer::ServantBase_var safe_servant2 (servant2);

          PortableServer::ObjectId_var id2;

          servant1->invocation_pool_and_lane (2, 0);
          servant2->invocation_pool_and_lane (2, 0);
          servant1->server_declared (1);
          servant2->server_declared (1);
          servant1->server_priority (default_thread_priority + 1);
          servant2->server_priority (default_thread_priority);

          id1 =
            rt_poa->activate_object_with_priority (servant1,
                                                   default_thread_priority + 1
                                                   ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;

          id2 =
            rt_poa->activate_object_with_priority (servant2,
                                                   default_thread_priority
                                                   ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;

          CORBA::Object_var object2 =
            poa->id_to_reference (id2.in ()
                                  ACE_ENV_ARG_PARAMETER);
          ACE_CHECK;

          this->tests_.size (this->tests_.size () + 1);
          this->tests_[this->tests_.size () - 1].object_ =
            test::_narrow (object2.in ()
                           ACE_ENV_ARG_PARAMETER);
          this->tests_[this->tests_.size () - 1].servant_ =
            servant2;
          ACE_CHECK;
        }
    }
  else
    {
      servant1->invocation_pool_and_lane (3, 0);

      id1 =
        rt_poa->activate_object (servant1
                                 ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }

  PortableServer::ServantBase_var safe_servant (servant1);

  CORBA::Object_var object1 =
    poa->id_to_reference (id1.in ()
                          ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->tests_.size (this->tests_.size () + 1);
  this->tests_[this->tests_.size () - 1].object_ =
    test::_narrow (object1.in ()
                   ACE_ENV_ARG_PARAMETER);
  this->tests_[this->tests_.size () - 1].servant_ =
    servant1;
  ACE_CHECK;
}

void
Server::create_poa_and_servant_with_tp_with_lanes_policy (const char *poa_name,
                                                          RTCORBA::PriorityModel priority_model
                                                          ACE_ENV_ARG_DECL)
{
  RTCORBA::ThreadpoolLanes lanes (2);
  lanes.length (2);

  lanes[0].lane_priority = default_thread_priority;
  lanes[0].static_threads = this->static_threads_;
  lanes[0].dynamic_threads = this->dynamic_threads_;

  lanes[1].lane_priority = default_thread_priority + 1;
  lanes[1].static_threads = this->static_threads_;
  lanes[1].dynamic_threads = this->dynamic_threads_;

  RTCORBA::ThreadpoolId threadpool_id =
    this->rt_orb_->create_threadpool_with_lanes (this->stacksize_,
                                                 lanes,
                                                 this->allow_borrowing_,
                                                 this->allow_request_buffering_,
                                                 this->max_buffered_requests_,
                                                 this->max_request_buffer_size_
                                                 ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::Policy_var threadpool_policy =
    this->rt_orb_->create_threadpool_policy (threadpool_id
                                             ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::Policy_var priority_model_policy =
    this->rt_orb_->create_priority_model_policy (priority_model,
                                                 default_thread_priority
                                                 ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  CORBA::PolicyList policies;

  policies.length (policies.length () + 1);
  policies[policies.length () - 1] =
    threadpool_policy;

  policies.length (policies.length () + 1);
  policies[policies.length () - 1] =
    priority_model_policy;

  PortableServer::POA_var poa =
    this->root_poa_->create_POA (poa_name,
                                 this->poa_manager_.in (),
                                 policies
                                 ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  RTPortableServer::POA_var rt_poa =
    RTPortableServer::POA::_narrow (poa.in ()
                                    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  test_i *servant1 = 0;
  ACE_NEW_THROW_EX (servant1,
                    test_i (this->orb_.in (),
                            poa.in (),
                            this->tests_
                            ACE_ENV_ARG_PARAMETER),
                    CORBA::NO_MEMORY ());
  ACE_CHECK;

  servant1->in_lane (1);

  PortableServer::ServantBase_var safe_servant1 (servant1);

  PortableServer::ObjectId_var id1;

  if (priority_model == RTCORBA::CLIENT_PROPAGATED)
    {
      servant1->invocation_pool (4);
      servant1->client_propagated (1);

      id1 =
        rt_poa->activate_object (servant1
                                 ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;
    }
  else
    {
      test_i *servant2 = 0;

      ACE_NEW_THROW_EX (servant2,
                        test_i (this->orb_.in (),
                                poa.in (),
                                this->tests_
                                ACE_ENV_ARG_PARAMETER),
                        CORBA::NO_MEMORY ());
      ACE_CHECK;

      servant2->in_lane (1);

      PortableServer::ServantBase_var safe_servant2 (servant2);

      PortableServer::ObjectId_var id2;

      servant1->invocation_pool_and_lane (5, 1);
      servant2->invocation_pool_and_lane (5, 0);
      servant1->server_declared (1);
      servant2->server_declared (1);
      servant1->server_priority (default_thread_priority + 1);
      servant2->server_priority (default_thread_priority);

      id1 =
        rt_poa->activate_object_with_priority (servant1,
                                               default_thread_priority + 1
                                               ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      id2 =
        rt_poa->activate_object_with_priority (servant2,
                                               default_thread_priority
                                               ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      CORBA::Object_var object2 =
        poa->id_to_reference (id2.in ()
                              ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      this->tests_.size (this->tests_.size () + 1);
      this->tests_[this->tests_.size () - 1].object_ =
        test::_narrow (object2.in ()
                       ACE_ENV_ARG_PARAMETER);
      this->tests_[this->tests_.size () - 1].servant_ =
        servant2;
      ACE_CHECK;
    }

  CORBA::Object_var object1 =
    poa->id_to_reference (id1.in ()
                          ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->tests_.size (this->tests_.size () + 1);
  this->tests_[this->tests_.size () - 1].object_ =
    test::_narrow (object1.in ()
                   ACE_ENV_ARG_PARAMETER);
  this->tests_[this->tests_.size () - 1].servant_ =
    servant1;
  ACE_CHECK;
}

void
Server::start_testing (ACE_ENV_SINGLE_ARG_DECL)
{
  Tests::ITERATOR iterator (this->tests_);
  while (!iterator.done ())
    {
      Test_Object_And_Servant *test = 0;
      iterator.next (test);

      setup_test_parameters (test,
                             this->current_.in (),
                             this->orb_.in ()
                             ACE_ENV_ARG_PARAMETER);
      ACE_CHECK;

      test->object_->start (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_CHECK;

      iterator.advance ();
    }
}

void
Server::test (ACE_ENV_SINGLE_ARG_DECL)
{
  this->start_testing (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  ACE_DEBUG ((LM_DEBUG,
              "\n\n*** Changing priority to be higher ***\n\n"));

  this->current_->the_priority (default_thread_priority + 1
                                ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->start_testing (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
}

void
Server::shutdown (ACE_ENV_SINGLE_ARG_DECL)
{
  this->orb_->shutdown (1 ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;

  this->orb_->destroy (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;
}

class Task : public ACE_Task_Base
{
public:

  Task (ACE_Thread_Manager &thread_manager,
        CORBA::ORB_ptr orb);

  int svc (void);

  CORBA::ORB_var orb_;

};

Task::Task (ACE_Thread_Manager &thread_manager,
            CORBA::ORB_ptr orb)
  : ACE_Task_Base (&thread_manager),
    orb_ (CORBA::ORB::_duplicate (orb))
{
}

int
Task::svc (void)
{
  ACE_TRY_NEW_ENV
    {
      Server server (this->orb_.in ()
                     ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      server.create_servant_in_root_poa (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      server.create_poa_and_servant_with_tp_policy ("tp_client_propagated_poa",
                                                    1,
                                                    RTCORBA::CLIENT_PROPAGATED
                                                    ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      server.create_poa_and_servant_with_tp_policy ("tp_server_declared_poa",
                                                    1,
                                                    RTCORBA::SERVER_DECLARED
                                                    ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      server.create_poa_and_servant_with_tp_policy ("tp_no_priority_model_poa",
                                                    0,
                                                    RTCORBA::CLIENT_PROPAGATED
                                                    ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      server.create_poa_and_servant_with_tp_with_lanes_policy ("tp_with_lanes_client_propagated_poa",
                                                               RTCORBA::CLIENT_PROPAGATED
                                                               ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      server.create_poa_and_servant_with_tp_with_lanes_policy ("tp_with_lanes_server_declared_poa",
                                                               RTCORBA::SERVER_DECLARED
                                                               ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      server.test (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;

      server.shutdown (ACE_ENV_SINGLE_ARG_PARAMETER);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "Exception caught:");
      return -1;
    }
  ACE_ENDTRY;

  return 0;
}

int
main (int argc, char *argv[])
{
  ACE_TRY_NEW_ENV
    {
      CORBA::ORB_var orb =
        CORBA::ORB_init (argc,
                         argv,
                         ""
                         ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // Make sure we can support multiple priorities that are
      // required for this test.
      check_supported_priorities (orb.in ());

      // Thread Manager for managing task.
      ACE_Thread_Manager thread_manager;

      // Create task.
      Task task (thread_manager,
                 orb.in ());

      // Task activation flags.
      long flags =
        THR_NEW_LWP |
        THR_JOINABLE |
        orb->orb_core ()->orb_params ()->thread_creation_flags ();

      // Activate task.
      int result =
        task.activate (flags);
      if (result == -1)
        {
          if (errno == EPERM)
            {
              ACE_ERROR_RETURN ((LM_ERROR,
                                 "Cannot create thread with scheduling policy %s\n"
                                 "because the user does not have the appropriate privileges, terminating program....\n"
                                 "Check svc.conf options and/or run as root\n",
                                 sched_policy_name (orb->orb_core ()->orb_params ()->ace_sched_policy ())),
                                2);
            }
          else
            // Unexpected error.
            ACE_ASSERT (0);
        }

      // Wait for task to exit.
      result =
        thread_manager.wait ();
      ACE_ASSERT (result != -1);
    }
  ACE_CATCHANY
    {
      ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                           "Exception caught:");
      return -1;
    }
  ACE_ENDTRY;

  return 0;
}


#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Array_Base<Test_Object_And_Servant>;
template class ACE_Array_Iterator<Test_Object_And_Servant>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Array_Base<Test_Object_And_Servant>
#pragma instantiate ACE_Array_Iterator<Test_Object_And_Servant>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?