object_adapter.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 2,061 行 · 第 1/5 页

CPP
2,061
字号
{
  // Check if this is a nested non_servant_upcall.
  if (this->object_adapter_.non_servant_upcall_nesting_level_ != 0)
    {
      // Remember previous instance of non_servant_upcall.
      this->previous_ = this->object_adapter_.non_servant_upcall_in_progress_;

      // Assert that the thread is the same as the one before.
      ACE_ASSERT (ACE_OS::thr_equal (this->object_adapter_.non_servant_upcall_thread_,
                                     ACE_OS::thr_self ()));
    }

  // Remember which thread is calling the adapter activators.
  this->object_adapter_.non_servant_upcall_thread_ = ACE_OS::thr_self ();

  // Mark the fact that a non-servant upcall is in progress.
  this->object_adapter_.non_servant_upcall_in_progress_ = this;

  // Adjust the nesting level.
  this->object_adapter_.non_servant_upcall_nesting_level_++;

  // Release the Object Adapter lock.
  this->object_adapter_.lock ().release ();
}

TAO_Object_Adapter::Non_Servant_Upcall::~Non_Servant_Upcall (void)
{
  // Reacquire the Object Adapter lock.
  this->object_adapter_.lock ().acquire ();

  // We are done with this nested upcall.
  this->object_adapter_.non_servant_upcall_in_progress_ = this->previous_;

  // Adjust the nesting level.
  this->object_adapter_.non_servant_upcall_nesting_level_--;

  // If we are at the outer nested upcall.
  if (this->object_adapter_.non_servant_upcall_nesting_level_ == 0)
    {
      // Reset thread id.
      this->object_adapter_.non_servant_upcall_thread_ =
        ACE_OS::NULL_thread;

      // Check if all pending requests are over.
      if (this->poa_.waiting_destruction () &&
          this->poa_.outstanding_requests () == 0)
        {
          ACE_TRY_NEW_ENV
            {
              this->poa_.complete_destruction_i (ACE_ENV_SINGLE_ARG_PARAMETER);
              ACE_TRY_CHECK;
            }
          ACE_CATCHANY
            {
              // Ignore exceptions
              ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                                   "TAO_POA::complete_destruction_i");
            }
          ACE_ENDTRY;
        }

      // If locking is enabled.
      if (this->object_adapter_.enable_locking_)
        // Wakeup all waiting threads.
        this->object_adapter_.non_servant_upcall_condition_.broadcast ();
    }
}

TAO_Object_Adapter::Servant_Upcall::Servant_Upcall (TAO_ORB_Core *oc)
  : object_adapter_ (0),
    poa_ (0),
    servant_ (0),
    state_ (INITIAL_STAGE),
    system_id_ (),
    user_id_ (0),
    current_context_ (),
#if (TAO_HAS_MINIMUM_POA == 0)
    cookie_ (0),
    operation_ (0),
#endif /* TAO_HAS_MINIMUM_POA == 0 */
    active_object_map_entry_ (0),
    using_servant_locator_ (0)
{
  TAO_Adapter *adapter = oc->poa_adapter ();
  TAO_Object_Adapter *object_adapter =
    ACE_dynamic_cast(TAO_Object_Adapter *, adapter);
  this->object_adapter_ = object_adapter;
}

int
TAO_Object_Adapter::Servant_Upcall::prepare_for_upcall (
  const TAO::ObjectKey &key,
  const char *operation,
  CORBA::Object_out forward_to
  ACE_ENV_ARG_DECL)
{
  while (1)
    {
      int wait_occurred_restart_call = 0;

      int result =
        this->prepare_for_upcall_i (key,
                                    operation,
                                    forward_to,
                                    wait_occurred_restart_call
                                    ACE_ENV_ARG_PARAMETER);
      ACE_CHECK_RETURN (TAO_Adapter::DS_FAILED);

      if (result == TAO_Adapter::DS_FAILED &&
          wait_occurred_restart_call)
        {
          // We ended up waiting on a condition variable.  The POA
          // state may have changed while we are waiting.  Therefore,
          // we need to call prepare_for_upcall_i() again.  We also
          // need to cleanup the state of the upcall object before
          // continuing.
          this->upcall_cleanup ();
          continue;
        }
      else
        {
          return result;
        }
    }
}

int
TAO_Object_Adapter::Servant_Upcall::prepare_for_upcall_i (
  const TAO::ObjectKey &key,
  const char *operation,
  CORBA::Object_out forward_to,
  int &wait_occurred_restart_call
  ACE_ENV_ARG_DECL)
{
  // Acquire the object adapter lock first.
  int result = this->object_adapter_->lock ().acquire ();
  if (result == -1)
    // Locking error.
    ACE_THROW_RETURN (CORBA::OBJ_ADAPTER (),
                      TAO_Adapter::DS_FAILED);

  // We have acquired the object adapater lock.  Record this for later
  // use.
  this->state_ = OBJECT_ADAPTER_LOCK_ACQUIRED;

  // Check if a non-servant upcall is in progress.  If a non-servant
  // upcall is in progress, wait for it to complete.  Unless of
  // course, the thread making the non-servant upcall is this thread.
  this->object_adapter_->wait_for_non_servant_upcalls_to_complete (
    ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (TAO_Adapter::DS_FAILED);

  // Locate the POA.
  this->object_adapter_->locate_poa (key,
                                     this->system_id_,
                                     this->poa_
                                     ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (TAO_Adapter::DS_FAILED);

  // Check the state of the POA Manager.
  this->poa_->check_poa_manager_state (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (TAO_Adapter::DS_FAILED);

  // Setup current for this request.
  this->current_context_.setup (this->poa_,
                                key);

  // Increase <poa->outstanding_requests_> for the duration of finding
  // the POA, finding the servant, and making the upcall.
  this->poa_->increment_outstanding_requests ();

  // We have setup the POA Current.  Record this for later use.
  this->state_ = POA_CURRENT_SETUP;

  ACE_TRY
    {
      ACE_FUNCTION_TIMEPROBE (TAO_POA_LOCATE_SERVANT_START);

      // Lookup the servant.
      this->servant_ = this->poa_->locate_servant_i (operation,
                                                     this->system_id_,
                                                     *this,
                                                     this->current_context_,
                                                     wait_occurred_restart_call
                                                     ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      if (wait_occurred_restart_call)
        {
          return TAO_Adapter::DS_FAILED;
        }
    }
#if (TAO_HAS_MINIMUM_CORBA == 0)
  ACE_CATCH (PortableServer::ForwardRequest, forward_request)
    {
      forward_to =
        CORBA::Object::_duplicate (forward_request.forward_reference.in ());
      return TAO_Adapter::DS_FORWARD;
    }
#else
  ACE_CATCHANY
    {
      ACE_UNUSED_ARG (forward_to);
      ACE_RE_THROW;
    }
#endif /* TAO_HAS_MINIMUM_CORBA */
  ACE_ENDTRY;

  // Now that we know the servant.
  this->current_context_.servant (this->servant_);

  // For servants from Servant Locators, there is no active object map
  // entry.
  if (this->active_object_map_entry ())
    this->current_context_.priority (this->active_object_map_entry ()->priority_);

  if (this->state_ != OBJECT_ADAPTER_LOCK_RELEASED)
    {
      // Release the object adapter lock.
      this->object_adapter_->lock ().release ();

      // We have release the object adapater lock.  Record this for
      // later use.
      this->state_ = OBJECT_ADAPTER_LOCK_RELEASED;
    }

  // Serialize servants (if appropriate).
  this->single_threaded_poa_setup (ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (TAO_Adapter::DS_FAILED);

  // We have acquired the servant lock.  Record this for later use.
  this->state_ = SERVANT_LOCK_ACQUIRED;

  // After this point, <this->servant_> is ready for dispatching.
  return TAO_Adapter::DS_OK;
}

void
TAO_Object_Adapter::Servant_Upcall::pre_invoke_remote_request (
  TAO_ServerRequest &req
  ACE_ENV_ARG_DECL)
{
  this->object_adapter_->servant_dispatcher_->pre_invoke_remote_request (
    this->poa (),
    this->priority (),
    req,
    this->pre_invoke_state_
    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
TAO_Object_Adapter::Servant_Upcall::pre_invoke_collocated_request (
  ACE_ENV_SINGLE_ARG_DECL)
{
  this->object_adapter_->servant_dispatcher_->pre_invoke_collocated_request (
    this->poa (),
    this->priority (),
    this->pre_invoke_state_
    ACE_ENV_ARG_PARAMETER);
  ACE_CHECK;
}

void
TAO_Object_Adapter::Servant_Upcall::post_invoke (void)
{
  this->object_adapter_->servant_dispatcher_->post_invoke (
    this->poa (),
    this->pre_invoke_state_);
}

TAO_Object_Adapter::Servant_Upcall::Pre_Invoke_State::Pre_Invoke_State (void)
  : state_ (NO_ACTION_REQUIRED),
    original_native_priority_ (0),
    original_CORBA_priority_ (0)
{
}

TAO_POA *
TAO_Object_Adapter::Servant_Upcall::lookup_POA (const TAO::ObjectKey &key
                                                ACE_ENV_ARG_DECL)
{
  // Acquire the object adapter lock first.
  int result = this->object_adapter_->lock ().acquire ();
  if (result == -1)
    // Locking error.
    ACE_THROW_RETURN (CORBA::OBJ_ADAPTER (),
                      0);

  // We have acquired the object adapater lock.  Record this for later
  // use.
  this->state_ = OBJECT_ADAPTER_LOCK_ACQUIRED;

  // Check if a non-servant upcall is in progress.  If a non-servant
  // upcall is in progress, wait for it to complete.  Unless of
  // course, the thread making the non-servant upcall is this thread.
  this->object_adapter_->wait_for_non_servant_upcalls_to_complete (
    ACE_ENV_SINGLE_ARG_PARAMETER);
  ACE_CHECK_RETURN (0);

  // Locate the POA.
  this->object_adapter_->locate_poa (key,
                                     this->system_id_,
                                     this->poa_
                                     ACE_ENV_ARG_PARAMETER);
  ACE_CHECK_RETURN (0);

  return this->poa_;
}

TAO_Object_Adapter::Servant_Upcall::~Servant_Upcall (void)
{
  this->upcall_cleanup ();
}

void
TAO_Object_Adapter::Servant_Upcall::upcall_cleanup (void)
{
  this->post_invoke ();

  switch (this->state_)
    {
    case SERVANT_LOCK_ACQUIRED:
      // Unlock servant (if appropriate).
      this->single_threaded_poa_cleanup ();

      /* FALLTHRU */

    case OBJECT_ADAPTER_LOCK_RELEASED:
      // Cleanup servant locator related state.  Note that because
      // this operation does not change any Object Adapter related
      // state, it is ok to call it outside the lock.
      this->servant_locator_cleanup ();

      // Since the object adapter lock was released, we must acquire
      // it.
      //
      // Note that errors are ignored here since we cannot do much
      // with it.
      this->object_adapter_->lock ().acquire ();

      // Check if a non-servant upcall is in progress.  If a
      // non-servant upcall is in progress, wait for it to complete.
      // Unless of course, the thread making the non-servant upcall is
      // this thread.
      this->object_adapter_->wait_for_non_servant_upcalls_to_complete ();

      // Cleanup servant related state.
      this->servant_cleanup ();

      /* FALLTHRU */

    case POA_CURRENT_SETUP:
      // Cleanup POA related state.
      this->poa_cleanup ();

      // Teardown current for this request.
      this->current_context_.teardown ();

      /* FALLTHRU */

    case OBJECT_ADAPTER_LOCK_ACQUIRED:
      // Finally, since the object adapter lock was acquired, we must
      // release it.
      this->object_adapter_->lock ().release ();

      /* FALLTHRU */

    case INITIAL_STAGE:
    default:
      // @@ Keep compiler happy, the states above are the only
      //    possible ones.
      break;
    }
}

void
TAO_Object_Adapter::wait_for_non_servant_upcalls_to_complete (CORBA::Environment &ACE_TRY_ENV)
{
#if defined (ACE_HAS_EXCEPTIONS)
  ACE_UNUSED_ARG (ACE_TRY_ENV); // FUZZ: ignore check_for_ace_check
#endif

  // Check if a non-servant upcall is in progress.  If a non-servant
  // upcall is in progress, wait for it to complete.  Unless of
  // course, the thread making the non-servant upcall is this thread.
  while (this->enable_locking_ &&
         this->non_servant_upcall_in_progress_ &&
         ! ACE_OS::thr_equal (this->non_servant_upcall_thread_,
                              ACE_OS::thr_self ()))
    {
      // If so wait...
      int result =
        this->non_servant_upcall_condition_.wait ();
      if (result == -1)
        ACE_THROW (CORBA::OBJ_ADAPTER ());
    }
}

void
TAO_Object_Adapter::wait_for_non_servant_upcalls_to_complete (void)
{
  // Non-exception throwing version.
  ACE_TRY_NEW_ENV
    {
      this->wait_for_non_servant_upcalls_to_complete (ACE_TRY_ENV);
      ACE_TRY_CHECK;
    }
  ACE_CATCHANY
    {
      ACE_ERROR ((LM_ERROR,
                  "TAO_Object_Adapter::wait_for_non_servant_upcalls_to_complete "
                  "threw exception it should not have!\n"));

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?