📄 giopimpl11.cc
字号:
// -*- Mode: C++; -*-// Package : omniORB2// giopImpl11.cc Created on: 14/02/2001// Author : Sai Lai Lo (sll)//// Copyright (C) 2001 AT&T Laboratories, Cambridge//// This file is part of the omniORB library//// The omniORB library is free software; you can redistribute it and/or// modify it under the terms of the GNU Library General Public// License as published by the Free Software Foundation; either// version 2 of the License, or (at your option) any later version.//// This library is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU// Library General Public License for more details.//// You should have received a copy of the GNU Library General Public// License along with this library; if not, write to the Free// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA // 02111-1307, USA////// Description:// *** PROPRIETORY INTERFACE ***// /* $Log: giopImpl11.cc,v $ Revision 1.1.4.24 2005/04/10 22:17:19 dgrisby Fixes to connection management. Thanks Jon Biggar. Revision 1.1.4.23 2004/07/01 19:16:24 dgrisby Client call interceptor oneway and response expected flipped. Thanks John Fardo. Revision 1.1.4.22 2004/05/05 11:02:01 dgrisby Bug in system exception marshalling with GIOP 1.1. Thanks Paul Haesler. Revision 1.1.4.21 2004/02/06 16:17:45 dgrisby Properly handle large giopMaxMsgSize settings. Revision 1.1.4.20 2003/07/25 16:07:18 dgrisby Incorrect COMM_FAILURE with GIOP 1.2 CloseConnection. Revision 1.1.4.19 2003/01/22 11:40:12 dgrisby Correct serverSendException interceptor use. Revision 1.1.4.18 2002/11/26 16:54:35 dgrisby Fix exception interception. Revision 1.1.4.17 2002/11/26 14:51:51 dgrisby Implement missing interceptors. Revision 1.1.4.16 2002/08/16 17:47:39 dgrisby Documentation, message updates. ORB tweaks to match docs. Revision 1.1.4.15 2002/07/04 15:14:41 dgrisby Correct usage of MessageErrors, fix log messages. Revision 1.1.4.14 2002/03/27 11:44:52 dpg1 Check in interceptors things left over from last week. Revision 1.1.4.13 2002/03/18 12:38:25 dpg1 Lower trace(0) to trace(1), propagate fatalException. Revision 1.1.4.12 2001/09/12 19:43:19 sll Enforce GIOP message size limit. Revision 1.1.4.11 2001/09/10 17:46:10 sll When a connection is broken, check if it has been shutdown orderly. If so, do a retry. Revision 1.1.4.10 2001/09/04 14:38:51 sll Added the boolean argument to notifyCommFailure to indicate if omniTransportLock is held by the caller. Revision 1.1.4.9 2001/09/03 16:55:41 sll Modified to match the new signature of the giopStream member functions that previously accept explicit deadline parameters. The deadline is now implicit in the giopStream. Revision 1.1.4.8 2001/08/17 17:12:37 sll Modularise ORB configuration parameters. Revision 1.1.4.7 2001/07/31 16:20:29 sll New primitives to acquire read lock on a connection. Revision 1.1.4.6 2001/06/20 18:35:18 sll Upper case send,recv,connect,shutdown to avoid silly substutition by macros defined in socket.h to rename these socket functions to something else. Revision 1.1.4.5 2001/05/11 14:28:56 sll Temporarily replaced all MARSHAL_MessageSizeExceedLimit with MARSHAL_MessageSizeExceedLimitOnServer. Revision 1.1.4.4 2001/05/01 17:56:29 sll Remove user exception check in sendUserException. This has been done by the caller. Revision 1.1.4.3 2001/05/01 17:15:18 sll Non-copy input now works correctly. Revision 1.1.4.2 2001/05/01 16:07:32 sll All GIOP implementations should now work with fragmentation and abitrary sizes non-copy transfer. Revision 1.1.4.1 2001/04/18 18:10:50 sll Big checkin with the brand new internal APIs.*/#include <omniORB4/CORBA.h>#include <giopStream.h>#include <giopStreamImpl.h>#include <giopStrand.h>#include <giopRope.h>#include <GIOP_S.h>#include <GIOP_C.h>#include <omniORB4/minorCode.h>#include <initialiser.h>#include <exceptiondefs.h>#include <omniORB4/callDescriptor.h>#include <omniORB4/omniInterceptors.h>#include <interceptors.h>#include <orbParameters.h>OMNI_NAMESPACE_BEGIN(omni)class nonexistence;class giopImpl11 {public: static void outputMessageBegin(giopStream*, void (*marshalHeader)(giopStream*)); static void outputMessageEnd(giopStream*); static void inputMessageBegin(giopStream*, void (*unmarshalHeader)(giopStream*)); static void inputMessageEnd(giopStream*,CORBA::Boolean disgard = 0); static void sendMsgErrorMessage(giopStream*); static void marshalRequestHeader(giopStream*); static void sendLocateRequest(giopStream*); static void unmarshalReplyHeader(giopStream*); static void unmarshalLocateReply(giopStream*); static void unmarshalWildCardRequestHeader(giopStream*); static void unmarshalRequestHeader(giopStream*); static void unmarshalLocateRequest(giopStream*); static void marshalReplyHeader(giopStream*); static void sendSystemException(giopStream*,const CORBA::SystemException&); static void sendUserException(giopStream*,const CORBA::UserException&); static void sendLocationForwardReply(giopStream*,CORBA::Object_ptr, CORBA::Boolean=0); static void sendLocateReply(giopStream*,GIOP::LocateStatusType, CORBA::Object_ptr,CORBA::SystemException* p = 0); static size_t inputRemaining(giopStream*); static void getInputData(giopStream*,omni::alignment_t,size_t); static void skipInputData(giopStream*,size_t); static void copyInputData(giopStream*,void*, size_t,omni::alignment_t); static size_t outputRemaining(const giopStream*); static void getReserveSpace(giopStream*,omni::alignment_t,size_t); static void copyOutputData(giopStream*,void*, size_t,omni::alignment_t); static CORBA::ULong currentInputPtr(const giopStream*); static CORBA::ULong currentOutputPtr(const giopStream*); friend class nonexistence; // Just to make gcc shut up. static void inputNewFragment(giopStream* g); static CORBA::Boolean inputReplyBegin(giopStream*, void (*unmarshalHeader)(giopStream*)); static void inputSkipWholeMessage(giopStream* g); static void inputTerminalProtocolError(giopStream* g); // Helper function. Call this function to indicate that a protocol // voilation was detected. This function *always* raise a // giopStream::CommFailure exception. Therefore the caller should not // expect this function to return. static void inputRaiseCommFailure(giopStream* g); static void outputNewMessage(giopStream* g); static void outputFlush(giopStream* g,CORBA::Boolean knownFragmentSize=0); static void outputSetFragmentSize(giopStream*,CORBA::ULong); static CORBA::Boolean outputHasReachedLimit(giopStream* g);private: giopImpl11(); giopImpl11(const giopImpl11&); giopImpl11& operator=(const giopImpl11&);};////////////////////////////////////////////////////////////////////////voidgiopImpl11::inputMessageBegin(giopStream* g, void (*unmarshalHeader)(giopStream*)) { again: { omni_tracedmutex_lock sync(*omniTransportLock); while (!(g->inputFullyBuffered() || g->pd_rdlocked)) { if (!g->rdLockNonBlocking()) { g->sleepOnRdLock(); } } } if (!g->pd_currentInputBuffer) { if (g->pd_input) { g->pd_currentInputBuffer = g->pd_input; g->pd_input = g->pd_input->next; g->pd_currentInputBuffer->next = 0; } else { g->pd_currentInputBuffer = g->inputMessage(); } } char* hdr = (char*)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start; if (hdr[4] != 1 || hdr[5] != 1) { if (hdr[5] == 0 && unmarshalHeader == unmarshalWildCardRequestHeader) { // This is a GIOP 1.0 message, switch to the implementation of giop 1.0 // and dispatch again. GIOP::Version v = { 1, 0 }; ((giopStrand &)*g).version = v; g->impl(giopStreamImpl::matchVersion(v)); OMNIORB_ASSERT(g->impl()); g->impl()->inputMessageBegin(g,g->impl()->unmarshalWildCardRequestHeader); return; } // We accept a CloseConnection message with any GIOP version. if ((GIOP::MsgType)hdr[7] != GIOP::CloseConnection) { inputTerminalProtocolError(g); // never reaches here. } } g->pd_unmarshal_byte_swap = (((hdr[6] & 0x1) == _OMNIORB_HOST_BYTE_ORDER_) ? 0 : 1 ); g->pd_inb_mkr = (void*)(hdr + 12); g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + g->pd_currentInputBuffer->last); g->inputExpectAnotherFragment(((hdr[6] & 0x2) ? 1 : 0)); g->inputMessageSize(g->pd_currentInputBuffer->size); g->inputFragmentToCome(g->pd_currentInputBuffer->size - (g->pd_currentInputBuffer->last - g->pd_currentInputBuffer->start)); if (unmarshalHeader == unmarshalWildCardRequestHeader) { unmarshalHeader(g); if (g->inputMessageSize() > orbParameters::giopMaxMsgSize) { OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnServer, CORBA::COMPLETED_NO); } } else { if (!inputReplyBegin(g,unmarshalHeader)) goto again; if (g->inputMessageSize() > orbParameters::giopMaxMsgSize) { OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnClient, CORBA::COMPLETED_NO); } }}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopImpl11::inputReplyBegin(giopStream* g, void (*unmarshalHeader)(giopStream*)) { char* hdr = (char*)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start; GIOP::MsgType mtype = (GIOP::MsgType)hdr[7]; switch (mtype) { case GIOP::LocateReply: { unmarshalLocateReply(g); break; } case GIOP::Reply: { unmarshalReplyHeader(g); break; } case GIOP::CloseConnection: { CORBA::ULong minor; CORBA::Boolean retry; g->pd_strand->orderly_closed = 1; g->notifyCommFailure(0,minor,retry); g->pd_strand->state(giopStrand::DYING); giopStream::CommFailure::_raise(minor, CORBA::COMPLETED_NO, retry,__FILE__,__LINE__); // never reach here. break; } default: inputTerminalProtocolError(g); // never reaches here. } GIOP_C* source = (GIOP_C*) g; if (source->replyId() == source->requestId()) { if (mtype == GIOP::LocateReply && unmarshalHeader != unmarshalLocateReply) { inputTerminalProtocolError(g); // never reach here } else if (mtype == GIOP::Reply && unmarshalHeader != unmarshalReplyHeader) { inputTerminalProtocolError(g); // never reach here } giopStream_Buffer* p = g->pd_input; while (p) { giopStream_Buffer* q = p->next; giopStream_Buffer::deleteBuffer(p); p = q; } g->pd_input = 0; g->inputMatchedId(1); return 1; } else { omni_tracedmutex_lock sync(*omniTransportLock); giopStreamList* gp = g->pd_strand->clients.next; for (; gp != &g->pd_strand->clients; gp = gp->next) { GIOP_C* target = (GIOP_C*)gp; if (target->state() != IOP_C::UnUsed && target->requestId() == source->replyId()) { if (target->inputMatchedId()) { // a reply has already been received! inputTerminalProtocolError(g); } target->pd_input = source->pd_input; source->pd_input = 0; giopStream_Buffer** pp = &target->pd_input; while (*pp) { pp = &((*pp)->next); } *pp = source->pd_currentInputBuffer; source->pd_currentInputBuffer = 0; target->inputFullyBuffered(source->inputFullyBuffered()); source->inputFullyBuffered(0); target->pd_rdlocked = 1; source->pd_rdlocked = 0; target->inputMatchedId(1); giopStream::wakeUpRdLock(g->pd_strand); return 0; } } // reach here only if there is no match to the replyid. // Skip this message inputSkipWholeMessage(g); return 0; }}////////////////////////////////////////////////////////////////////////voidgiopImpl11::inputSkipWholeMessage(giopStream* g) { giopStream_Buffer* p = g->pd_input; while (p) { giopStream_Buffer* q = p->next; giopStream_Buffer::deleteBuffer(p); p = q; } g->pd_input = 0; do { if (g->pd_currentInputBuffer) { giopStream_Buffer::deleteBuffer(g->pd_currentInputBuffer); g->pd_currentInputBuffer = 0; } while (g->inputFragmentToCome()) { giopStream_Buffer* dummy= g->inputChunk(g->inputFragmentToCome()); g->inputFragmentToCome( g->inputFragmentToCome() - (dummy->last - dummy->start) ); giopStream_Buffer::deleteBuffer(dummy); } if (g->inputExpectAnotherFragment()) { inputNewFragment(g); } else { break; } } while (1); g->pd_inb_mkr = g->pd_inb_end;}////////////////////////////////////////////////////////////////////////voidgiopImpl11::inputMessageEnd(giopStream* g,CORBA::Boolean disgard) { if ( g->pd_strand->state() != giopStrand::DYING ) { while ( g->inputExpectAnotherFragment() && g->inputFragmentToCome() == 0 && g->pd_inb_end == g->pd_inb_mkr ) { // If there are more fragments to come and we do not have any // data left in our buffer, we keep fetching the next // fragment until one of the conditions is false. // This will cater for the case where the remote end is sending // the last fragment(s) with 0 body size to indicate the end of // a message. inputNewFragment(g); } if (!disgard && inputRemaining(g)) { if (omniORB::trace(15)) { omniORB::logger l; l << "Garbage left at the end of input message from " << g->pd_strand->connection->peeraddress() << "\n"; } if (!orbParameters::strictIIOP) { disgard = 1; } else { inputTerminalProtocolError(g); // never reach here. } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -