giopimpl10.cc

来自「编译工具」· CC 代码 · 共 1,684 行 · 第 1/4 页

CC
1,684
字号
// -*- Mode: C++; -*-//                            Package   : omniORB2// giopImpl10.cc              Created on: 14/02/2001//                            Author    : Sai Lai Lo (sll)////    Copyright (C) 2001 AT&T Laboratories, Cambridge////    This file is part of the omniORB library////    The omniORB library is free software; you can redistribute it and/or//    modify it under the terms of the GNU Library General Public//    License as published by the Free Software Foundation; either//    version 2 of the License, or (at your option) any later version.////    This library is distributed in the hope that it will be useful,//    but WITHOUT ANY WARRANTY; without even the implied warranty of//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU//    Library General Public License for more details.////    You should have received a copy of the GNU Library General Public//    License along with this library; if not, write to the Free//    Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  //    02111-1307, USA////// Description://	*** PROPRIETORY INTERFACE ***//	/*  $Log: giopImpl10.cc,v $  Revision 1.1.4.22  2005/04/10 22:17:19  dgrisby  Fixes to connection management. Thanks Jon Biggar.  Revision 1.1.4.21  2004/07/01 19:16:25  dgrisby  Client call interceptor oneway and response expected flipped. Thanks  John Fardo.  Revision 1.1.4.20  2003/07/25 16:07:18  dgrisby  Incorrect COMM_FAILURE with GIOP 1.2 CloseConnection.  Revision 1.1.4.19  2003/01/22 11:40:12  dgrisby  Correct serverSendException interceptor use.  Revision 1.1.4.18  2002/11/26 16:54:34  dgrisby  Fix exception interception.  Revision 1.1.4.17  2002/11/26 14:51:50  dgrisby  Implement missing interceptors.  Revision 1.1.4.16  2002/07/04 15:14:40  dgrisby  Correct usage of MessageErrors, fix log messages.  Revision 1.1.4.15  2002/03/27 11:44:51  dpg1  Check in interceptors things left over from last week.  Revision 1.1.4.14  2002/03/18 12:38:25  dpg1  Lower trace(0) to trace(1), propagate fatalException.  Revision 1.1.4.13  2001/10/19 11:06:45  dpg1  Principal support for GIOP 1.0. Correct some spelling mistakes.  Revision 1.1.4.12  2001/09/12 19:43:19  sll  Enforce GIOP message size limit.  Revision 1.1.4.11  2001/09/10 17:46:09  sll  When a connection is broken, check if it has been shutdown orderly. If so,  do a retry.  Revision 1.1.4.10  2001/09/04 14:38:51  sll  Added the boolean argument to notifyCommFailure to indicate if  omniTransportLock is held by the caller.  Revision 1.1.4.9  2001/09/03 16:55:41  sll  Modified to match the new signature of the giopStream member functions that  previously accept explicit deadline parameters. The deadline is now  implicit in the giopStream.  Revision 1.1.4.8  2001/08/17 17:12:36  sll  Modularise ORB configuration parameters.  Revision 1.1.4.7  2001/07/31 16:20:30  sll  New primitives to acquire read lock on a connection.  Revision 1.1.4.6  2001/06/20 18:35:18  sll  Upper case send,recv,connect,shutdown to avoid silly substutition by  macros defined in socket.h to rename these socket functions  to something else.  Revision 1.1.4.5  2001/05/11 14:30:12  sll  Message size limit is now enforced.  Revision 1.1.4.4  2001/05/04 13:55:58  sll  Silly mistake that causes non-copy marshalling to do the wrong thing  in GIOP 1.0.  Revision 1.1.4.3  2001/05/01 17:56:29  sll  Remove user exception check in sendUserException. This has been done by  the caller.  Revision 1.1.4.2  2001/05/01 17:15:18  sll  Non-copy input now works correctly.  Revision 1.1.4.1  2001/04/18 18:10:51  sll  Big checkin with the brand new internal APIs.*/#include <omniORB4/CORBA.h>#include <giopStream.h>#include <giopStreamImpl.h>#include <giopStrand.h>#include <giopRope.h>#include <GIOP_S.h>#include <GIOP_C.h>#include <omniORB4/minorCode.h>#include <initialiser.h>#include <exceptiondefs.h>#include <omniORB4/callDescriptor.h>#include <omniORB4/omniInterceptors.h>#include <interceptors.h>#include <orbParameters.h>// Define PRE_CALCULATE_MESSAGE_SIZE to force the code to pre-calculate// the total size of the message before the actual marshalling.// The implementation can actually avoid this calculation if full message// can be fully buffered.// We however have to force pre-calculation because at the moment, the Any// marshalling implementation is not re-entrant. #define PRE_CALCULATE_MESSAGE_SIZEOMNI_NAMESPACE_BEGIN(omni)class nonexistence;class giopImpl10 {public:  static void outputMessageBegin(giopStream*,				 void (*marshalHeader)(giopStream*));  static void outputMessageEnd(giopStream*);  static void inputMessageBegin(giopStream*,				void (*unmarshalHeader)(giopStream*));  static void inputMessageEnd(giopStream*,CORBA::Boolean skip = 0);  static void sendMsgErrorMessage(giopStream*);  static void marshalRequestHeader(giopStream*);  static void sendLocateRequest(giopStream*);  static void unmarshalReplyHeader(giopStream*);  static void unmarshalLocateReply(giopStream*);  static void unmarshalWildCardRequestHeader(giopStream*);  static void unmarshalRequestHeader(giopStream*);  static void unmarshalLocateRequest(giopStream*);  static void marshalReplyHeader(giopStream*);  static void sendSystemException(giopStream*,const CORBA::SystemException&);  static void sendUserException(giopStream*,const CORBA::UserException&);  static void sendLocationForwardReply(giopStream*,CORBA::Object_ptr,				       CORBA::Boolean=0);  static void sendLocateReply(giopStream*,GIOP::LocateStatusType,			      CORBA::Object_ptr,CORBA::SystemException* p = 0);  static size_t inputRemaining(giopStream*);  static void getInputData(giopStream*,omni::alignment_t,size_t);  static void skipInputData(giopStream*,size_t);  static void copyInputData(giopStream*,void*, size_t,omni::alignment_t);  static size_t outputRemaining(const giopStream*);  static void getReserveSpace(giopStream*,omni::alignment_t,size_t);  static void copyOutputData(giopStream*,void*, size_t,omni::alignment_t);  static CORBA::ULong currentInputPtr(const giopStream*);  static CORBA::ULong currentOutputPtr(const giopStream*);  friend class nonexistence;  // Just to make gcc shut up.  static CORBA::Boolean inputReplyBegin(giopStream*, 					void (*unmarshalHeader)(giopStream*));  static void inputTerminalProtocolError(giopStream* g);  // Helper function.  Call this function to indicate that a protocol  // voilation was detected.  This function *always* raise a  // giopStream::CommFailure exception.  Therefore the caller should not  // expect this function to return.  static void inputRaiseCommFailure(giopStream* g);  static void outputNewMessage(giopStream*);  static void outputFlush(giopStream* g);  static void outputSetMessageSize(giopStream*,CORBA::ULong);private:  giopImpl10();  giopImpl10(const giopImpl10&);  giopImpl10& operator=(const giopImpl10&);};////////////////////////////////////////////////////////////////////////voidgiopImpl10::inputMessageBegin(giopStream* g,			      void (*unmarshalHeader)(giopStream*)) { again:  {    omni_tracedmutex_lock sync(*omniTransportLock);    while (!(g->inputFullyBuffered() || g->pd_rdlocked)) {      if (!g->rdLockNonBlocking()) {	g->sleepOnRdLock();      }    }  }  if (!g->pd_currentInputBuffer) {    if (g->pd_input) {      g->pd_currentInputBuffer = g->pd_input;      g->pd_input = g->pd_input->next;      g->pd_currentInputBuffer->next = 0;    }    else {      g->pd_currentInputBuffer = g->inputMessage();    }  }  char* hdr = (char*)g->pd_currentInputBuffer +                      g->pd_currentInputBuffer->start;  if (hdr[4] != 1 || hdr[5] != 0) {    // We accept a CloseConnection message with any GIOP version.    if ((GIOP::MsgType)hdr[7] != GIOP::CloseConnection) {      inputTerminalProtocolError(g);      // never reaches here.    }  }  g->pd_unmarshal_byte_swap = (((hdr[6] & 0x1) == _OMNIORB_HOST_BYTE_ORDER_)			       ? 0 : 1 );  g->pd_inb_mkr = (void*)(hdr + 12);  g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + 			  g->pd_currentInputBuffer->last);  g->inputExpectAnotherFragment(0);  g->inputMessageSize(g->pd_currentInputBuffer->size);  g->inputFragmentToCome(g->pd_currentInputBuffer->size - 			 (g->pd_currentInputBuffer->last -			  g->pd_currentInputBuffer->start));  if (unmarshalHeader == unmarshalWildCardRequestHeader)    unmarshalHeader(g);  else {    if (!inputReplyBegin(g,unmarshalHeader)) goto again;  }}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopImpl10::inputReplyBegin(giopStream* g, 			    void (*unmarshalHeader)(giopStream*)) {  if (g->inputMessageSize() > orbParameters::giopMaxMsgSize) {    // This is not the perfect solution if we are multiplexing calls onto    // this connection. The reply we are getting here may not be for the    // call of this thread. If that is the case, we are sending the wrong    // exception to the threads. On the otherhand, the server side is    // quite doggy and it is not prudent to trust the message content.    // Therefore, decoding the message further may not be doing any good.    OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnClient,		  CORBA::COMPLETED_YES);  }  char* hdr = (char*)g->pd_currentInputBuffer +                      g->pd_currentInputBuffer->start;  GIOP::MsgType mtype = (GIOP::MsgType)hdr[7];  switch (mtype) {  case GIOP::LocateReply:    {      unmarshalLocateReply(g);      break;    }  case GIOP::Reply:    {      unmarshalReplyHeader(g);      break;    }  case GIOP::CloseConnection:    {      CORBA::ULong minor;      CORBA::Boolean retry;      g->pd_strand->orderly_closed = 1;      g->notifyCommFailure(0,minor,retry);      g->pd_strand->state(giopStrand::DYING);      giopStream::CommFailure::_raise(minor,				      CORBA::COMPLETED_NO,				      retry,__FILE__,__LINE__);      // never reach here.      break;    }  default:    inputTerminalProtocolError(g);    // never reaches here.  }  GIOP_C* source = (GIOP_C*) g;  if (source->replyId() == source->requestId()) {    if (mtype == GIOP::LocateReply && 	unmarshalHeader != unmarshalLocateReply) {      inputTerminalProtocolError(g);      // never reach here    }    else if (mtype == GIOP::Reply &&	     unmarshalHeader != unmarshalReplyHeader) {      inputTerminalProtocolError(g);      // never reach here    }    giopStream_Buffer* p = g->pd_input;    while (p) {      giopStream_Buffer* q = p->next;      giopStream_Buffer::deleteBuffer(p);      p = q;    }    g->pd_input = 0;    g->inputMatchedId(1);    return 1;  }  else {    omni_tracedmutex_lock sync(*omniTransportLock);    giopStreamList* gp = g->pd_strand->clients.next;    for (; gp != &g->pd_strand->clients; gp = gp->next) {      GIOP_C* target = (GIOP_C*)gp;      if (target->state() != IOP_C::UnUsed &&	  target->requestId() == source->replyId()) {	if (target->inputMatchedId()) {	  // a reply has already been received!	  inputTerminalProtocolError(g);	}	target->pd_input = source->pd_input;	source->pd_input = 0;	giopStream_Buffer** pp = &target->pd_input;	while (*pp) {	  pp = &((*pp)->next);	}	*pp = source->pd_currentInputBuffer;	source->pd_currentInputBuffer = 0;	target->inputFullyBuffered(source->inputFullyBuffered());	source->inputFullyBuffered(0);	target->pd_rdlocked = 1;	source->pd_rdlocked = 0;	target->inputMatchedId(1);	giopStream::wakeUpRdLock(g->pd_strand);	return 0;      }    }    // reach here only if there is no match to the replyid.    // Skip this message    giopStream_Buffer* p = g->pd_input;    while (p) {      giopStream_Buffer* q = p->next;      giopStream_Buffer::deleteBuffer(p);      p = q;    }    g->pd_input = 0;    if (g->pd_currentInputBuffer) {      giopStream_Buffer::deleteBuffer(g->pd_currentInputBuffer);      g->pd_currentInputBuffer = 0;    }    while (g->inputFragmentToCome()) {      giopStream_Buffer* dummy = g->inputChunk(g->inputFragmentToCome());      g->inputFragmentToCome( g->inputFragmentToCome() -			      (dummy->last - dummy->start) );      giopStream_Buffer::deleteBuffer(dummy);    }    return 0;  }}////////////////////////////////////////////////////////////////////////voidgiopImpl10::inputMessageEnd(giopStream* g,CORBA::Boolean disgard) {  if ( g->pd_strand->state() != giopStrand::DYING ) {    if (!disgard && inputRemaining(g)) {      if (omniORB::trace(15)) {	omniORB::logger l;	l << "Garbage left at the end of input message from "	  << g->pd_strand->connection->peeraddress() << "\n";      }      if (!orbParameters::strictIIOP) {	disgard = 1;      }      else {	inputTerminalProtocolError(g);	// never reach here.      }    }    if (disgard)      skipInputData(g,inputRemaining(g));    if (g->pd_currentInputBuffer) {      g->releaseInputBuffer(g->pd_currentInputBuffer);      g->pd_currentInputBuffer = 0;    }  }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?