asynch_reply_dispatcher.cpp
来自「这是广泛使用的通信开源项目,对于大容量,高并发的通讯要求完全能够胜任,他广泛可用」· C++ 代码 · 共 293 行
CPP
293 行
// Asynch_Reply_Dispatcher.cpp,v 1.10 2003/12/08 04:36:24 bala Exp
#include "Asynch_Reply_Dispatcher.h"
#include "tao/Pluggable_Messaging_Utils.h"
#include "tao/ORB_Core.h"
#include "tao/debug.h"
#include "tao/ORB_Core.h"
#include "tao/Transport.h"
#include "ace/CORBA_macros.h"
#if !defined (__ACE_INLINE__)
#include "Asynch_Reply_Dispatcher.i"
#endif /* __ACE_INLINE__ */
ACE_RCSID(Messaging, Asynch_Reply_Dispatcher, "Asynch_Reply_Dispatcher.cpp,v 1.10 2003/12/08 04:36:24 bala Exp")
// Constructor.
TAO_Asynch_Reply_Dispatcher::TAO_Asynch_Reply_Dispatcher (
const TAO_Reply_Handler_Skeleton &reply_handler_skel,
Messaging::ReplyHandler_ptr reply_handler,
TAO_ORB_Core *orb_core
)
:TAO_Asynch_Reply_Dispatcher_Base (orb_core)
, reply_handler_skel_ (reply_handler_skel)
, reply_handler_ (Messaging::ReplyHandler::_duplicate (reply_handler))
, timeout_handler_ (0)
{
}
// Destructor.
TAO_Asynch_Reply_Dispatcher::~TAO_Asynch_Reply_Dispatcher (void)
{
}
// Dispatch the reply.
int
TAO_Asynch_Reply_Dispatcher::dispatch_reply (
TAO_Pluggable_Reply_Params ¶ms
)
{
if (params.input_cdr_ == 0)
return -1;
bool cont_dispatch =
this->try_dispatch_reply ();
if (this->timeout_handler_)
{
// If we had registered timeout handlers just cancel them and
// loose ownership of the handlers
this->timeout_handler_->cancel ();
this->timeout_handler_->remove_reference ();
this->timeout_handler_ = 0;
// AMI Timeout Handling End
}
// A simple protocol that we follow. If the timeout had been handled
// by another thread, the last refcount for us will be held by the
// timeout handler. Hence the above call to remove_reference () will
// delete us. We then have to rely on the status of our stack
// variable to exit safely.
if (!cont_dispatch)
return 0;
this->reply_status_ = params.reply_status_;
// Transfer the <params.input_cdr_>'s content to this->reply_cdr_
ACE_Data_Block *db =
this->reply_cdr_.clone_from (*params.input_cdr_);
if (db == 0)
{
if (TAO_debug_level > 2)
ACE_ERROR ((
LM_ERROR,
"TAO (%P|%t) - Asynch_Reply_Dispatcher::dispatch_reply ",
"clone_from failed \n"));
return -1;
}
// See whether we need to delete the data block by checking the
// flags. We cannot be happy that we initally allocated the
// datablocks of the stack. If this method is called twice, as is in
// some cases where the same invocation object is used to make two
// invocations like forwarding, the release becomes essential.
if (ACE_BIT_DISABLED (db->flags (),
ACE_Message_Block::DONT_DELETE))
db->release ();
// Steal the buffer, that way we don't do any unnecesary copies of
// this data.
CORBA::ULong max = params.svc_ctx_.maximum ();
CORBA::ULong len = params.svc_ctx_.length ();
IOP::ServiceContext *context_list = params.svc_ctx_.get_buffer (1);
this->reply_service_info_.replace (max, len, context_list, 1);
if (TAO_debug_level >= 4)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("TAO_Messaging (%P|%t) - Asynch_Reply_Dispatcher::")
ACE_TEXT ("dispatch_reply\n")));
}
CORBA::ULong reply_error = TAO_AMI_REPLY_NOT_OK;
switch (this->reply_status_)
{
case TAO_PLUGGABLE_MESSAGE_NO_EXCEPTION:
reply_error = TAO_AMI_REPLY_OK;
break;
case TAO_PLUGGABLE_MESSAGE_USER_EXCEPTION:
reply_error = TAO_AMI_REPLY_USER_EXCEPTION;
break;
case TAO_PLUGGABLE_MESSAGE_SYSTEM_EXCEPTION:
reply_error = TAO_AMI_REPLY_SYSTEM_EXCEPTION;
break;
default:
case TAO_PLUGGABLE_MESSAGE_LOCATION_FORWARD:
// @@ Michael: Not even the spec mentions this case.
// We have to think about this case.
// Handle the forwarding and return so the stub restarts the
// request!
reply_error = TAO_AMI_REPLY_NOT_OK;
break;
}
if (!CORBA::is_nil (this->reply_handler_.in ()))
{
ACE_TRY_NEW_ENV
{
// Call the Reply Handler's skeleton.
reply_handler_skel_ (this->reply_cdr_,
this->reply_handler_.in (),
reply_error
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
ACE_CATCHANY
{
if (TAO_debug_level >= 4)
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Exception during reply handler");
}
ACE_ENDTRY;
}
this->decr_refcount ();
return 1;
}
void
TAO_Asynch_Reply_Dispatcher::connection_closed (void)
{
ACE_TRY_NEW_ENV
{
bool cont_dispatch =
this->try_dispatch_reply ();
if (this->timeout_handler_)
{
// If we had registered timeout handlers just cancel them and
// loose ownership of the handlers
this->timeout_handler_->cancel ();
this->timeout_handler_->remove_reference ();
this->timeout_handler_ = 0;
}
// AMI Timeout Handling End
// A simple protocol that we follow. If the timeout had been handled
// by another thread, the last refcount for us will be held by the
// timeout handler. Hence the above call to remove_reference () will
// delete us. We then have to rely on the status of our stack
// variable to exit safely.
if (!cont_dispatch)
return;
// Generate a fake exception....
CORBA::COMM_FAILURE comm_failure (0, CORBA::COMPLETED_MAYBE);
TAO_OutputCDR out_cdr;
comm_failure._tao_encode (out_cdr ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// Turn into an output CDR
TAO_InputCDR cdr (out_cdr);
if (!CORBA::is_nil (this->reply_handler_.in ()))
{
this->reply_handler_skel_ (cdr,
this->reply_handler_.in (),
TAO_AMI_REPLY_SYSTEM_EXCEPTION
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
}
ACE_CATCHANY
{
if (TAO_debug_level >= 4)
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Asynch_Reply_Dispacher::connection_closed");
}
}
ACE_ENDTRY;
(void) this->decr_refcount ();
}
// AMI Timeout Handling Begin
void
TAO_Asynch_Reply_Dispatcher::reply_timed_out (void)
{
ACE_DECLARE_NEW_CORBA_ENV;
ACE_TRY
{
// Generate a fake exception....
CORBA::TIMEOUT timeout_failure (
CORBA::SystemException::_tao_minor_code (
TAO_TIMEOUT_RECV_MINOR_CODE,
errno),
CORBA::COMPLETED_MAYBE);
TAO_OutputCDR out_cdr;
timeout_failure._tao_encode (out_cdr ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
// This is okay here... Everything relies on our refcount being
// held by the timeout handler, whose refcount in turn is held
// by the reactor.
if (!this->try_dispatch_reply ())
return;
// Turn into an output CDR
TAO_InputCDR cdr (out_cdr);
if (!CORBA::is_nil (this->reply_handler_.in ()))
{
this->reply_handler_skel_ (cdr,
this->reply_handler_.in (),
TAO_AMI_REPLY_SYSTEM_EXCEPTION
ACE_ENV_ARG_PARAMETER);
ACE_TRY_CHECK;
}
}
ACE_CATCHANY
{
if (TAO_debug_level >= 4)
{
ACE_PRINT_EXCEPTION (ACE_ANY_EXCEPTION,
"Asynch_Reply_Dispacher::reply_timed_out");
}
}
ACE_ENDTRY;
ACE_CHECK;
(void) this->decr_refcount ();
}
long
TAO_Asynch_Reply_Dispatcher::schedule_timer (CORBA::ULong request_id,
const ACE_Time_Value &max_wait_time
ACE_ENV_ARG_DECL)
{
if (this->timeout_handler_ == 0)
{
// @@ Need to use the pool for this..
ACE_NEW_THROW_EX (this->timeout_handler_,
TAO_Asynch_Timeout_Handler (
this,
this->transport_->orb_core ()->reactor ()),
CORBA::NO_MEMORY ());
}
return this->timeout_handler_->schedule_timer (
this->transport_->tms (),
request_id,
max_wait_time);
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?