asynch_reply_dispatcher.cpp

来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 293 行

CPP
293
字号
// Asynch_Reply_Dispatcher.cpp,v 1.10 2003/12/08 04:36:24 bala Exp

#include "Asynch_Reply_Dispatcher.h"

#include "tao/Pluggable_Messaging_Utils.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"
#include "tao/ORB_Core.h"
#include "tao/Transport.h"

#include "ace/CORBA_macros.h"

#if !defined (__ACE_INLINE__)
#include "Asynch_Reply_Dispatcher.i"
#endif /* __ACE_INLINE__ */

ACE_RCSID(Messaging, Asynch_Reply_Dispatcher, "Asynch_Reply_Dispatcher.cpp,v 1.10 2003/12/08 04:36:24 bala Exp")


// Constructor.
TAO_Asynch_Reply_Dispatcher::TAO_Asynch_Reply_Dispatcher (
    const TAO_Reply_Handler_Skeleton &reply_handler_skel,
    Messaging::ReplyHandler_ptr reply_handler,
    TAO_ORB_Core *orb_core
  )
  :TAO_Asynch_Reply_Dispatcher_Base (orb_core)
  , reply_handler_skel_ (reply_handler_skel)
  , reply_handler_ (Messaging::ReplyHandler::_duplicate (reply_handler))
  , timeout_handler_ (0)
{
}

// Destructor.
TAO_Asynch_Reply_Dispatcher::~TAO_Asynch_Reply_Dispatcher (void)
{
}

// Dispatch the reply.
int
TAO_Asynch_Reply_Dispatcher::dispatch_reply (
    TAO_Pluggable_Reply_Params &params
  )
{
  if (params.input_cdr_ == 0)
    return -1;

  bool cont_dispatch =
    this->try_dispatch_reply ();

  if (this->timeout_handler_)
    {
      // If we had registered timeout handlers just cancel them and
      // loose ownership of the handlers
      this->timeout_handler_->cancel ();
      this->timeout_handler_->remove_reference ();
      this->timeout_handler_ = 0;
      // AMI Timeout Handling End
    }

  // A simple protocol that we follow. If the timeout had been handled
  // by another thread, the last refcount for us will be held by the
  // timeout handler. Hence the above call to remove_reference () will
  // delete us. We then have to rely on the status of our stack
  // variable to exit safely.
  if (!cont_dispatch)
    return 0;

  this->reply_status_ = params.reply_status_;

  // Transfer the <params.input_cdr_>'s content to this->reply_cdr_
  ACE_Data_Block *db =
    this->reply_cdr_.clone_from (*params.input_cdr_);

  if (db == 0)
    {
      if (TAO_debug_level > 2)
        ACE_ERROR ((
          LM_ERROR,
          "TAO (%P|%t) - Asynch_Reply_Dispatcher::dispatch_reply ",
          "clone_from failed \n"));
      return -1;
    }

  // See whether we need to delete the data block by checking the
  // flags. We cannot be happy that we initally allocated the
  // datablocks of the stack. If this method is called twice, as is in
  // some cases where the same invocation object is used to make two
  // invocations like forwarding, the release becomes essential.
  if (ACE_BIT_DISABLED (db->flags (),
                        ACE_Message_Block::DONT_DELETE))
    db->release ();

  // Steal the buffer, that way we don't do any unnecesary copies of
  // this data.
  CORBA::ULong max = params.svc_ctx_.maximum ();
  CORBA::ULong len = params.svc_ctx_.length ();
  IOP::ServiceContext *context_list = params.svc_ctx_.get_buffer (1);
  this->reply_service_info_.replace (max, len, context_list, 1);


  if (TAO_debug_level >= 4)
    {
      ACE_DEBUG ((LM_DEBUG,
                  ACE_TEXT ("TAO_Messaging (%P|%t) - Asynch_Reply_Dispatcher::")
                  ACE_TEXT ("dispatch_reply\n")));
    }



  CORBA::ULong reply_error = TAO_AMI_REPLY_NOT_OK;
  switch (this->reply_status_)
    {
    case TAO_PLUGGABLE_MESSAGE_NO_EXCEPTION:
      reply_error = TAO_AMI_REPLY_OK;
      break;
    case TAO_PLUGGABLE_MESSAGE_USER_EXCEPTION:
      reply_error = TAO_AMI_REPLY_USER_EXCEPTION;
      break;
    case TAO_PLUGGABLE_MESSAGE_SYSTEM_EXCEPTION:
      reply_error = TAO_AMI_REPLY_SYSTEM_EXCEPTION;
      break;
    default:
    case TAO_PLUGGABLE_MESSAGE_LOCATION_FORWARD:
      // @@ Michael: Not even the spec mentions this case.
      //             We have to think about this case.
      // Handle the forwarding and return so the stub restarts the
      // request!
      reply_error = TAO_AMI_REPLY_NOT_OK;
      break;
    }

  if (!CORBA::is_nil (this->reply_handler_.in ()))
    {
      ACE_TRY_NEW_ENV
        {
          // Call the Reply Handler's skeleton.
          reply_handler_skel_ (this->reply_cdr_,
                               this->reply_handler_.in (),
                               reply_error
                                ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }
      ACE_CATCHANY
        {
          if (TAO_debug_level >= 4)
            ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                                 "Exception during reply handler");
        }
      ACE_ENDTRY;
    }

  this->decr_refcount ();

  return 1;
}

void
TAO_Asynch_Reply_Dispatcher::connection_closed (void)
{
  ACE_TRY_NEW_ENV
    {
      bool cont_dispatch =
        this->try_dispatch_reply ();

      if (this->timeout_handler_)
        {
          // If we had registered timeout handlers just cancel them and
          // loose ownership of the handlers
          this->timeout_handler_->cancel ();
          this->timeout_handler_->remove_reference ();
          this->timeout_handler_ = 0;
        }

      // AMI Timeout Handling End

      // A simple protocol that we follow. If the timeout had been handled
      // by another thread, the last refcount for us will be held by the
      // timeout handler. Hence the above call to remove_reference () will
      // delete us. We then have to rely on the status of our stack
      // variable to exit safely.
      if (!cont_dispatch)
        return;

      // Generate a fake exception....
      CORBA::COMM_FAILURE comm_failure (0, CORBA::COMPLETED_MAYBE);

      TAO_OutputCDR out_cdr;

      comm_failure._tao_encode (out_cdr ACE_ENV_ARG_PARAMETER);

      ACE_TRY_CHECK;

      // Turn into an output CDR
      TAO_InputCDR cdr (out_cdr);

      if (!CORBA::is_nil (this->reply_handler_.in ()))
        {
          this->reply_handler_skel_ (cdr,
                                     this->reply_handler_.in (),
                                     TAO_AMI_REPLY_SYSTEM_EXCEPTION
                                      ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }
    }
  ACE_CATCHANY
    {
      if (TAO_debug_level >= 4)
        {
          ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                               "Asynch_Reply_Dispacher::connection_closed");
        }

    }
  ACE_ENDTRY;

  (void) this->decr_refcount ();
}

// AMI Timeout Handling Begin

void
TAO_Asynch_Reply_Dispatcher::reply_timed_out (void)
{
  ACE_DECLARE_NEW_CORBA_ENV;

  ACE_TRY
    {
      // Generate a fake exception....
      CORBA::TIMEOUT timeout_failure (
        CORBA::SystemException::_tao_minor_code (
            TAO_TIMEOUT_RECV_MINOR_CODE,
            errno),
         CORBA::COMPLETED_MAYBE);

      TAO_OutputCDR out_cdr;

      timeout_failure._tao_encode (out_cdr ACE_ENV_ARG_PARAMETER);
      ACE_TRY_CHECK;

      // This is okay here... Everything relies on our refcount being
      // held by the timeout handler, whose refcount in turn is held
      // by the reactor.
      if (!this->try_dispatch_reply ())
        return;

      // Turn into an output CDR
      TAO_InputCDR cdr (out_cdr);

      if (!CORBA::is_nil (this->reply_handler_.in ()))
        {
          this->reply_handler_skel_ (cdr,
                                     this->reply_handler_.in (),
                                     TAO_AMI_REPLY_SYSTEM_EXCEPTION
                                      ACE_ENV_ARG_PARAMETER);
          ACE_TRY_CHECK;
        }
    }
  ACE_CATCHANY
    {
      if (TAO_debug_level >= 4)
        {
          ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
                               "Asynch_Reply_Dispacher::reply_timed_out");
        }

    }
  ACE_ENDTRY;
  ACE_CHECK;

  (void) this->decr_refcount ();
}

long
TAO_Asynch_Reply_Dispatcher::schedule_timer (CORBA::ULong request_id,
                                             const ACE_Time_Value &max_wait_time
                                             ACE_ENV_ARG_DECL)
{
  if (this->timeout_handler_ == 0)
    {
      // @@ Need to use the pool for this..
      ACE_NEW_THROW_EX (this->timeout_handler_,
                        TAO_Asynch_Timeout_Handler (
                          this,
                          this->transport_->orb_core ()->reactor ()),
                        CORBA::NO_MEMORY ());
    }

  return this->timeout_handler_->schedule_timer (
      this->transport_->tms (),
      request_id,
      max_wait_time);
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?