rt_servant_dispatcher.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 401 行

CPP
401
字号
#include "RT_Servant_Dispatcher.h"

#if defined (TAO_HAS_CORBA_MESSAGING) && TAO_HAS_CORBA_MESSAGING != 0

#include "RT_POA.h"
#include "tao/RTCORBA/Thread_Pool.h"
#include "tao/ORB_Core.h"
#include "tao/TAO_Server_Request.h"
#include "tao/Transport.h"
#include "tao/IIOP_Transport.h"
#include "tao/IIOP_Connection_Handler.h"
#include "tao/Service_Context.h"
#include "tao/Protocols_Hooks.h"
#include "tao/debug.h"


ACE_RCSID (RTPortableServer,
           RT_Servant_Dispatcher,
           "RT_Servant_Dispatcher.cpp,v 1.15 2003/06/20 04:25:13 dhinton Exp")


TAO_RT_Servant_Dispatcher::~TAO_RT_Servant_Dispatcher (void)
{
}

void
TAO_RT_Servant_Dispatcher::pre_invoke_remote_request (
  TAO_POA &poa,
  CORBA::Short servant_priority,
  TAO_ServerRequest &req,
  TAO_Object_Adapter::Servant_Upcall::Pre_Invoke_State &pre_invoke_state
  ACE_ENV_ARG_DECL)
{
  TAO_Service_Context &request_service_context =
    req.request_service_context ();
  TAO_Service_Context &reply_service_context = req.reply_service_context ();

  TAO_Thread_Pool *thread_pool =
    (TAO_Thread_Pool *) poa.thread_pool ();

  if (thread_pool != 0 &&
      thread_pool->with_lanes ())
    {
      //
      // We don't mess with the priority of threads in lanes.
      //

      if (TAO_debug_level > 0)
        {
          // Get the ORB_Core's TSS resources.
          TAO_ORB_Core_TSS_Resources *tss =
            poa.orb_core ().get_tss_resources ();

          /// Get the lane attribute in TSS.
          TAO_Thread_Lane *lane =
            (TAO_Thread_Lane *) tss->lane_;

          ACE_ASSERT (lane->pool ().id () ==
                      thread_pool->id ());

          ACE_DEBUG ((LM_DEBUG,
                      ACE_TEXT ("Using thread pool with lane ")
                      ACE_TEXT ("(%P|%t|%d|%d): original thread ")
                      ACE_TEXT ("CORBA/native priority %d/%d not changed\n"),
                      lane->pool ().id (),
                      lane->id (),
                      lane->lane_priority (),
                      lane->native_priority ()));
        }

      return;
    }

  // Remember current thread's priority.
  TAO_Protocols_Hooks *tph =
    poa.orb_core ().get_protocols_hooks (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  if (req.transport ()->tag () == IOP::TAG_INTERNET_IOP)
    {
      CORBA::Policy_var policy = poa.server_protocol ();

      int result =
        tph->update_server_protocol_properties (
          policy.in (),
          req.transport (),
          "iiop");

      if (result != 0)
        ACE_ERROR((LM_ERROR,
                   "Error in getting the effective protocol properties\n"));
    }

  const char *priority_model;
  RTCORBA::Priority target_priority = TAO_INVALID_PRIORITY;
  TAO_POA_Cached_Policies &cached_policies =
    poa.cached_policies ();

  // NOT_SPECIFIED PriorityModel processing.
  if (cached_policies.priority_model () ==
      TAO_POA_Cached_Policies::NOT_SPECIFIED)
    {
      priority_model = "RTCORBA::NOT_SPECIFIED";
    }

  // CLIENT_PROPAGATED PriorityModel processing.
  else if (cached_policies.priority_model () ==
      TAO_POA_Cached_Policies::CLIENT_PROPAGATED)
    {
      priority_model = "RTCORBA::CLIENT_PROPAGATED";

      // Attempt to extract client-propagated priority from the
      // ServiceContextList of the request.
      const IOP::ServiceContext *context;

      if (request_service_context.get_context (IOP::RTCorbaPriority,
                                               &context) == 1)
        {
          // Extract the target priority
          TAO_InputCDR cdr (ACE_reinterpret_cast
                            (const char*,
                             context->context_data.get_buffer ()),
                            context->context_data.length ());
          CORBA::Boolean byte_order;
          if ((cdr >> ACE_InputCDR::to_boolean (byte_order)) == 0)
            ACE_THROW (CORBA::MARSHAL ());
          cdr.reset_byte_order (ACE_static_cast(int,byte_order));

          if ((cdr >> target_priority) == 0)
            ACE_THROW (CORBA::MARSHAL ());

          // Save the target priority in the response service
          // context to propagate back to the client as specified
          // by the RTCORBA specification.
          reply_service_context.set_context (*context);
        }
      else
        {
          // Use default priority if none came in the request.
          // (Request must have come from a non-RT ORB.)
          target_priority = cached_policies.server_priority ();
        }
    }
  else
    // SERVER_DECLARED PriorityModel processing.
    {
      priority_model = "RTCORBA::SERVER_DECLARED";

      // Use the request associated with the servant.
      target_priority = servant_priority;
    }

  char thread_pool_id[BUFSIZ];
  if (TAO_debug_level > 0)
    {
      if (thread_pool == 0)
        ACE_OS::strcpy (thread_pool_id,
                        "default thread pool");
      else
        ACE_OS::sprintf (thread_pool_id,
                         "thread pool %d",
                         thread_pool->id ());
    }

  // Target priority is invalid.
  if (target_priority == TAO_INVALID_PRIORITY)
    {
      if (TAO_debug_level > 0)
        {

// If we are in a multi-threaded configuration, print out the current
// thread priority.
#if defined (ACE_HAS_THREADS)

          if (tph->get_thread_CORBA_and_native_priority (
                pre_invoke_state.original_CORBA_priority_,
                pre_invoke_state.original_native_priority_
                ACE_ENV_ARG_PARAMETER) == -1)
            ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
                                               CORBA::COMPLETED_NO));

          ACE_DEBUG ((LM_DEBUG,
                      ACE_TEXT ("%s processing using %s ")
                      ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
                      ACE_TEXT ("not changed\n"),
                      priority_model,
                      thread_pool_id,
                      pre_invoke_state.original_CORBA_priority_,
                      pre_invoke_state.original_native_priority_));

// If we are in a single-threaded configuration, we cannot get the
// current thread priority.  Therefore, print out a simpler message.
#else /* ACE_HAS_THREADS */

          ACE_DEBUG ((LM_DEBUG,
                      ACE_TEXT ("%s processing using %s ")
                      ACE_TEXT ("(%P|%t): original thread CORBA/native priority ")
                      ACE_TEXT ("not changed\n"),
                      priority_model,
                      thread_pool_id));

#endif /* ACE_HAS_THREADS */

        }
    }
  else
    {
      // Get the current thread's priority.

      if (tph->get_thread_CORBA_and_native_priority (
            pre_invoke_state.original_CORBA_priority_,
            pre_invoke_state.original_native_priority_
            ACE_ENV_ARG_PARAMETER) == -1)
        ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
                                           CORBA::COMPLETED_NO));

      // Priority needs to be changed temporarily changed for the
      // duration of request.
      if (target_priority != pre_invoke_state.original_CORBA_priority_)
        {
          if (tph->set_thread_CORBA_priority (target_priority
                                              ACE_ENV_ARG_PARAMETER)
              == -1)
            ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
                                               CORBA::COMPLETED_NO));

          pre_invoke_state.state_ =
            TAO_Object_Adapter::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;

          if (TAO_debug_level > 0)
            {
              CORBA::Short native_priority;
              tph->get_thread_native_priority (native_priority
                                               ACE_ENV_ARG_PARAMETER);

              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("%s processing using %s ")
                          ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
                          ACE_TEXT ("temporarily changed to CORBA/native priority %d/%d\n"),
                          priority_model,
                          thread_pool_id,
                          pre_invoke_state.original_CORBA_priority_,
                          pre_invoke_state.original_native_priority_,
                          target_priority,
                          native_priority));
            }
        }
      // No change in priority required.
      else
        {
          if (TAO_debug_level > 0)
            {
              ACE_DEBUG ((LM_DEBUG,
                          ACE_TEXT ("%s processing using %s ")
                          ACE_TEXT ("(%P|%t): original thread CORBA/native priority %d/%d ")
                          ACE_TEXT ("is the same as the target priority\n"),
                          priority_model,
                          thread_pool_id,
                          pre_invoke_state.original_CORBA_priority_,
                          pre_invoke_state.original_native_priority_));
            }
        }
    }
}

void
TAO_RT_Servant_Dispatcher::pre_invoke_collocated_request (TAO_POA &poa,
                                                          CORBA::Short servant_priority,
                                                          TAO_Object_Adapter::Servant_Upcall::Pre_Invoke_State &pre_invoke_state
                                                          ACE_ENV_ARG_DECL)
{
  TAO_Thread_Pool *thread_pool =
    (TAO_Thread_Pool *) poa.thread_pool ();

  if (thread_pool == 0 ||
      thread_pool->with_lanes ())
    {
      //
      // We don't mess with the priority of threads in lanes or for
      // the default thread pool.
      //
      return;
    }

  TAO_POA_Cached_Policies &cached_policies =
    poa.cached_policies ();

  if (cached_policies.priority_model () !=
      TAO_POA_Cached_Policies::SERVER_DECLARED ||
      servant_priority == TAO_INVALID_PRIORITY)
    {
      //
      // We either don't have server declared model or servant
      // priority is invalid.
      //
      return;
    }

  //
  // SERVER_DECLARED PriorityModel processing.
  //

  // Remember current thread's priority.
  TAO_Protocols_Hooks *tph =
    poa.orb_core ().get_protocols_hooks (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK;

  if (tph->get_thread_CORBA_and_native_priority (pre_invoke_state.original_CORBA_priority_,
                                                 pre_invoke_state.original_native_priority_
                                                 ACE_ENV_ARG_PARAMETER)
      == -1)
    ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
                                       CORBA::COMPLETED_NO));

  // Change the priority of the current thread for the duration of
  // request.
  if (servant_priority != pre_invoke_state.original_CORBA_priority_)
    {
      if (tph->set_thread_CORBA_priority (servant_priority
                                          ACE_ENV_ARG_PARAMETER)
          == -1)
        ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
                                           CORBA::COMPLETED_NO));

      pre_invoke_state.state_ =
        TAO_Object_Adapter::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED;
    }
}

void
TAO_RT_Servant_Dispatcher::post_invoke (TAO_POA &poa,
                                        TAO_Object_Adapter::Servant_Upcall::Pre_Invoke_State &pre_invoke_state)

{
  if (pre_invoke_state.state_ ==
      TAO_Object_Adapter::Servant_Upcall::Pre_Invoke_State::PRIORITY_RESET_REQUIRED)
    {
      pre_invoke_state.state_ =
        TAO_Object_Adapter::Servant_Upcall::Pre_Invoke_State::NO_ACTION_REQUIRED;

      ACE_DECLARE_NEW_CORBA_ENV;

      ACE_TRY
        {
          // Reset the priority of the current thread back to its original
          // value.
          TAO_Protocols_Hooks *tph =
            poa.orb_core ().get_protocols_hooks (ACE_ENV_SINGLE_ARG_PARAMETER);
          ACE_TRY_CHECK;

          if (tph->set_thread_native_priority (
                 pre_invoke_state.original_native_priority_
                                               ACE_ENV_ARG_PARAMETER)
              == -1)
            ACE_THROW (CORBA::DATA_CONVERSION (CORBA::OMGVMCID | 2,
                                               CORBA::COMPLETED_NO));
          ACE_TRY_CHECK;
        }
      ACE_CATCHANY
        {
          // Eat up the exception.
          ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                               "Exception caught: TAO (%P|%t) - "
                               "Priority_Model_Processing::"
                               "~Priority_Model_Processing");
        }
      ACE_ENDTRY;
    }
}

TAO_POA *
TAO_RT_Servant_Dispatcher::create_POA (const ACE_CString &name,
                                       TAO_POA_Manager &poa_manager,
                                       const TAO_POA_Policy_Set &policies,
                                       TAO_POA *parent,
                                       ACE_Lock &lock,
                                       TAO_SYNCH_MUTEX &thread_lock,
                                       TAO_ORB_Core &orb_core,
                                       TAO_Object_Adapter *object_adapter
                                       ACE_ENV_ARG_DECL)
{
  TAO_RT_POA *poa;

  ACE_NEW_THROW_EX (poa,
                    TAO_RT_POA (name,
                                poa_manager,
                                policies,
                                parent,
                                lock,
                                thread_lock,
                                orb_core,
                                object_adapter
                                ACE_ENV_ARG_PARAMETER),
                    CORBA::NO_MEMORY ());
  ACE_CHECK_RETURN (0);

  return poa;
}

#endif /* TAO_HAS_CORBA_MESSAGING && TAO_HAS_CORBA_MESSAGING != 0 */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?