giopimpl10.cc
来自「编译工具」· CC 代码 · 共 1,684 行 · 第 1/4 页
CC
1,684 行
// -*- Mode: C++; -*-// Package : omniORB2// giopImpl10.cc Created on: 14/02/2001// Author : Sai Lai Lo (sll)//// Copyright (C) 2001 AT&T Laboratories, Cambridge//// This file is part of the omniORB library//// The omniORB library is free software; you can redistribute it and/or// modify it under the terms of the GNU Library General Public// License as published by the Free Software Foundation; either// version 2 of the License, or (at your option) any later version.//// This library is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU// Library General Public License for more details.//// You should have received a copy of the GNU Library General Public// License along with this library; if not, write to the Free// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA // 02111-1307, USA////// Description:// *** PROPRIETORY INTERFACE ***// /* $Log: giopImpl10.cc,v $ Revision 1.1.4.22 2005/04/10 22:17:19 dgrisby Fixes to connection management. Thanks Jon Biggar. Revision 1.1.4.21 2004/07/01 19:16:25 dgrisby Client call interceptor oneway and response expected flipped. Thanks John Fardo. Revision 1.1.4.20 2003/07/25 16:07:18 dgrisby Incorrect COMM_FAILURE with GIOP 1.2 CloseConnection. Revision 1.1.4.19 2003/01/22 11:40:12 dgrisby Correct serverSendException interceptor use. Revision 1.1.4.18 2002/11/26 16:54:34 dgrisby Fix exception interception. Revision 1.1.4.17 2002/11/26 14:51:50 dgrisby Implement missing interceptors. Revision 1.1.4.16 2002/07/04 15:14:40 dgrisby Correct usage of MessageErrors, fix log messages. Revision 1.1.4.15 2002/03/27 11:44:51 dpg1 Check in interceptors things left over from last week. Revision 1.1.4.14 2002/03/18 12:38:25 dpg1 Lower trace(0) to trace(1), propagate fatalException. Revision 1.1.4.13 2001/10/19 11:06:45 dpg1 Principal support for GIOP 1.0. Correct some spelling mistakes. Revision 1.1.4.12 2001/09/12 19:43:19 sll Enforce GIOP message size limit. Revision 1.1.4.11 2001/09/10 17:46:09 sll When a connection is broken, check if it has been shutdown orderly. If so, do a retry. Revision 1.1.4.10 2001/09/04 14:38:51 sll Added the boolean argument to notifyCommFailure to indicate if omniTransportLock is held by the caller. Revision 1.1.4.9 2001/09/03 16:55:41 sll Modified to match the new signature of the giopStream member functions that previously accept explicit deadline parameters. The deadline is now implicit in the giopStream. Revision 1.1.4.8 2001/08/17 17:12:36 sll Modularise ORB configuration parameters. Revision 1.1.4.7 2001/07/31 16:20:30 sll New primitives to acquire read lock on a connection. Revision 1.1.4.6 2001/06/20 18:35:18 sll Upper case send,recv,connect,shutdown to avoid silly substutition by macros defined in socket.h to rename these socket functions to something else. Revision 1.1.4.5 2001/05/11 14:30:12 sll Message size limit is now enforced. Revision 1.1.4.4 2001/05/04 13:55:58 sll Silly mistake that causes non-copy marshalling to do the wrong thing in GIOP 1.0. Revision 1.1.4.3 2001/05/01 17:56:29 sll Remove user exception check in sendUserException. This has been done by the caller. Revision 1.1.4.2 2001/05/01 17:15:18 sll Non-copy input now works correctly. Revision 1.1.4.1 2001/04/18 18:10:51 sll Big checkin with the brand new internal APIs.*/#include <omniORB4/CORBA.h>#include <giopStream.h>#include <giopStreamImpl.h>#include <giopStrand.h>#include <giopRope.h>#include <GIOP_S.h>#include <GIOP_C.h>#include <omniORB4/minorCode.h>#include <initialiser.h>#include <exceptiondefs.h>#include <omniORB4/callDescriptor.h>#include <omniORB4/omniInterceptors.h>#include <interceptors.h>#include <orbParameters.h>// Define PRE_CALCULATE_MESSAGE_SIZE to force the code to pre-calculate// the total size of the message before the actual marshalling.// The implementation can actually avoid this calculation if full message// can be fully buffered.// We however have to force pre-calculation because at the moment, the Any// marshalling implementation is not re-entrant. #define PRE_CALCULATE_MESSAGE_SIZEOMNI_NAMESPACE_BEGIN(omni)class nonexistence;class giopImpl10 {public: static void outputMessageBegin(giopStream*, void (*marshalHeader)(giopStream*)); static void outputMessageEnd(giopStream*); static void inputMessageBegin(giopStream*, void (*unmarshalHeader)(giopStream*)); static void inputMessageEnd(giopStream*,CORBA::Boolean skip = 0); static void sendMsgErrorMessage(giopStream*); static void marshalRequestHeader(giopStream*); static void sendLocateRequest(giopStream*); static void unmarshalReplyHeader(giopStream*); static void unmarshalLocateReply(giopStream*); static void unmarshalWildCardRequestHeader(giopStream*); static void unmarshalRequestHeader(giopStream*); static void unmarshalLocateRequest(giopStream*); static void marshalReplyHeader(giopStream*); static void sendSystemException(giopStream*,const CORBA::SystemException&); static void sendUserException(giopStream*,const CORBA::UserException&); static void sendLocationForwardReply(giopStream*,CORBA::Object_ptr, CORBA::Boolean=0); static void sendLocateReply(giopStream*,GIOP::LocateStatusType, CORBA::Object_ptr,CORBA::SystemException* p = 0); static size_t inputRemaining(giopStream*); static void getInputData(giopStream*,omni::alignment_t,size_t); static void skipInputData(giopStream*,size_t); static void copyInputData(giopStream*,void*, size_t,omni::alignment_t); static size_t outputRemaining(const giopStream*); static void getReserveSpace(giopStream*,omni::alignment_t,size_t); static void copyOutputData(giopStream*,void*, size_t,omni::alignment_t); static CORBA::ULong currentInputPtr(const giopStream*); static CORBA::ULong currentOutputPtr(const giopStream*); friend class nonexistence; // Just to make gcc shut up. static CORBA::Boolean inputReplyBegin(giopStream*, void (*unmarshalHeader)(giopStream*)); static void inputTerminalProtocolError(giopStream* g); // Helper function. Call this function to indicate that a protocol // voilation was detected. This function *always* raise a // giopStream::CommFailure exception. Therefore the caller should not // expect this function to return. static void inputRaiseCommFailure(giopStream* g); static void outputNewMessage(giopStream*); static void outputFlush(giopStream* g); static void outputSetMessageSize(giopStream*,CORBA::ULong);private: giopImpl10(); giopImpl10(const giopImpl10&); giopImpl10& operator=(const giopImpl10&);};////////////////////////////////////////////////////////////////////////voidgiopImpl10::inputMessageBegin(giopStream* g, void (*unmarshalHeader)(giopStream*)) { again: { omni_tracedmutex_lock sync(*omniTransportLock); while (!(g->inputFullyBuffered() || g->pd_rdlocked)) { if (!g->rdLockNonBlocking()) { g->sleepOnRdLock(); } } } if (!g->pd_currentInputBuffer) { if (g->pd_input) { g->pd_currentInputBuffer = g->pd_input; g->pd_input = g->pd_input->next; g->pd_currentInputBuffer->next = 0; } else { g->pd_currentInputBuffer = g->inputMessage(); } } char* hdr = (char*)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start; if (hdr[4] != 1 || hdr[5] != 0) { // We accept a CloseConnection message with any GIOP version. if ((GIOP::MsgType)hdr[7] != GIOP::CloseConnection) { inputTerminalProtocolError(g); // never reaches here. } } g->pd_unmarshal_byte_swap = (((hdr[6] & 0x1) == _OMNIORB_HOST_BYTE_ORDER_) ? 0 : 1 ); g->pd_inb_mkr = (void*)(hdr + 12); g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + g->pd_currentInputBuffer->last); g->inputExpectAnotherFragment(0); g->inputMessageSize(g->pd_currentInputBuffer->size); g->inputFragmentToCome(g->pd_currentInputBuffer->size - (g->pd_currentInputBuffer->last - g->pd_currentInputBuffer->start)); if (unmarshalHeader == unmarshalWildCardRequestHeader) unmarshalHeader(g); else { if (!inputReplyBegin(g,unmarshalHeader)) goto again; }}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopImpl10::inputReplyBegin(giopStream* g, void (*unmarshalHeader)(giopStream*)) { if (g->inputMessageSize() > orbParameters::giopMaxMsgSize) { // This is not the perfect solution if we are multiplexing calls onto // this connection. The reply we are getting here may not be for the // call of this thread. If that is the case, we are sending the wrong // exception to the threads. On the otherhand, the server side is // quite doggy and it is not prudent to trust the message content. // Therefore, decoding the message further may not be doing any good. OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnClient, CORBA::COMPLETED_YES); } char* hdr = (char*)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start; GIOP::MsgType mtype = (GIOP::MsgType)hdr[7]; switch (mtype) { case GIOP::LocateReply: { unmarshalLocateReply(g); break; } case GIOP::Reply: { unmarshalReplyHeader(g); break; } case GIOP::CloseConnection: { CORBA::ULong minor; CORBA::Boolean retry; g->pd_strand->orderly_closed = 1; g->notifyCommFailure(0,minor,retry); g->pd_strand->state(giopStrand::DYING); giopStream::CommFailure::_raise(minor, CORBA::COMPLETED_NO, retry,__FILE__,__LINE__); // never reach here. break; } default: inputTerminalProtocolError(g); // never reaches here. } GIOP_C* source = (GIOP_C*) g; if (source->replyId() == source->requestId()) { if (mtype == GIOP::LocateReply && unmarshalHeader != unmarshalLocateReply) { inputTerminalProtocolError(g); // never reach here } else if (mtype == GIOP::Reply && unmarshalHeader != unmarshalReplyHeader) { inputTerminalProtocolError(g); // never reach here } giopStream_Buffer* p = g->pd_input; while (p) { giopStream_Buffer* q = p->next; giopStream_Buffer::deleteBuffer(p); p = q; } g->pd_input = 0; g->inputMatchedId(1); return 1; } else { omni_tracedmutex_lock sync(*omniTransportLock); giopStreamList* gp = g->pd_strand->clients.next; for (; gp != &g->pd_strand->clients; gp = gp->next) { GIOP_C* target = (GIOP_C*)gp; if (target->state() != IOP_C::UnUsed && target->requestId() == source->replyId()) { if (target->inputMatchedId()) { // a reply has already been received! inputTerminalProtocolError(g); } target->pd_input = source->pd_input; source->pd_input = 0; giopStream_Buffer** pp = &target->pd_input; while (*pp) { pp = &((*pp)->next); } *pp = source->pd_currentInputBuffer; source->pd_currentInputBuffer = 0; target->inputFullyBuffered(source->inputFullyBuffered()); source->inputFullyBuffered(0); target->pd_rdlocked = 1; source->pd_rdlocked = 0; target->inputMatchedId(1); giopStream::wakeUpRdLock(g->pd_strand); return 0; } } // reach here only if there is no match to the replyid. // Skip this message giopStream_Buffer* p = g->pd_input; while (p) { giopStream_Buffer* q = p->next; giopStream_Buffer::deleteBuffer(p); p = q; } g->pd_input = 0; if (g->pd_currentInputBuffer) { giopStream_Buffer::deleteBuffer(g->pd_currentInputBuffer); g->pd_currentInputBuffer = 0; } while (g->inputFragmentToCome()) { giopStream_Buffer* dummy = g->inputChunk(g->inputFragmentToCome()); g->inputFragmentToCome( g->inputFragmentToCome() - (dummy->last - dummy->start) ); giopStream_Buffer::deleteBuffer(dummy); } return 0; }}////////////////////////////////////////////////////////////////////////voidgiopImpl10::inputMessageEnd(giopStream* g,CORBA::Boolean disgard) { if ( g->pd_strand->state() != giopStrand::DYING ) { if (!disgard && inputRemaining(g)) { if (omniORB::trace(15)) { omniORB::logger l; l << "Garbage left at the end of input message from " << g->pd_strand->connection->peeraddress() << "\n"; } if (!orbParameters::strictIIOP) { disgard = 1; } else { inputTerminalProtocolError(g); // never reach here. } } if (disgard) skipInputData(g,inputRemaining(g)); if (g->pd_currentInputBuffer) { g->releaseInputBuffer(g->pd_currentInputBuffer); g->pd_currentInputBuffer = 0; } }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?