giopimpl12.cc
来自「编译工具」· CC 代码 · 共 2,182 行 · 第 1/5 页
CC
2,182 行
// -*- Mode: C++; -*-// Package : omniORB2// giopImpl12.cc Created on: 14/02/2001// Author : Sai Lai Lo (sll)//// Copyright (C) 2001 AT&T Laboratories, Cambridge//// This file is part of the omniORB library//// The omniORB library is free software; you can redistribute it and/or// modify it under the terms of the GNU Library General Public// License as published by the Free Software Foundation; either// version 2 of the License, or (at your option) any later version.//// This library is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU// Library General Public License for more details.//// You should have received a copy of the GNU Library General Public// License along with this library; if not, write to the Free// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA // 02111-1307, USA////// Description:// *** PROPRIETORY INTERFACE ***// /* $Log: giopImpl12.cc,v $ Revision 1.1.4.30 2005/10/17 15:49:10 dgrisby Missing newline at end of log message. Revision 1.1.4.29 2005/04/10 22:17:19 dgrisby Fixes to connection management. Thanks Jon Biggar. Revision 1.1.4.28 2005/02/02 00:21:07 dgrisby Memory leak with CloseConnection or error received in a queued message. Revision 1.1.4.27 2004/10/18 00:23:54 dgrisby Bug in trying to set up bidirectional GIOP with GIOP 1.0 and 1.1. Revision 1.1.4.26 2004/07/01 19:16:23 dgrisby Client call interceptor oneway and response expected flipped. Thanks John Fardo. Revision 1.1.4.25 2004/06/18 14:45:24 dgrisby Obscure and rare bug in GIOP 1.2 system exception sending. Revision 1.1.4.24 2004/05/25 14:02:22 dgrisby Properly close bidirectional connections. Revision 1.1.4.23 2004/02/06 16:17:45 dgrisby Properly handle large giopMaxMsgSize settings. Revision 1.1.4.22 2003/11/19 10:42:09 dgrisby Locking bug with comm failure in bidirectional GIOP. Revision 1.1.4.21 2003/07/16 14:22:38 dgrisby Speed up oneway handling a little. More tracing for split messages. Revision 1.1.4.20 2003/01/22 11:40:12 dgrisby Correct serverSendException interceptor use. Revision 1.1.4.19 2002/12/18 17:51:03 dgrisby Respond nicely if we receive a request message on a client strand. Revision 1.1.4.18 2002/11/26 16:54:35 dgrisby Fix exception interception. Revision 1.1.4.17 2002/11/26 14:51:52 dgrisby Implement missing interceptors. Revision 1.1.4.16 2002/07/04 15:14:41 dgrisby Correct usage of MessageErrors, fix log messages. Revision 1.1.4.15 2002/03/27 11:44:52 dpg1 Check in interceptors things left over from last week. Revision 1.1.4.14 2002/03/18 12:38:26 dpg1 Lower trace(0) to trace(1), propagate fatalException. Revision 1.1.4.13 2001/09/12 19:43:19 sll Enforce GIOP message size limit. Revision 1.1.4.12 2001/09/10 17:46:10 sll When a connection is broken, check if it has been shutdown orderly. If so, do a retry. Revision 1.1.4.11 2001/09/04 14:38:52 sll Added the boolean argument to notifyCommFailure to indicate if omniTransportLock is held by the caller. Revision 1.1.4.10 2001/09/03 16:55:41 sll Modified to match the new signature of the giopStream member functions that previously accept explicit deadline parameters. The deadline is now implicit in the giopStream. Revision 1.1.4.9 2001/08/17 17:12:37 sll Modularise ORB configuration parameters. Revision 1.1.4.8 2001/07/31 16:20:29 sll New primitives to acquire read lock on a connection. Revision 1.1.4.7 2001/07/13 15:23:51 sll Call notifyCallFullyBuffered when a request has arrived and fully buffered. Revision 1.1.4.6 2001/06/20 18:35:18 sll Upper case send,recv,connect,shutdown to avoid silly substutition by macros defined in socket.h to rename these socket functions to something else. Revision 1.1.4.5 2001/05/11 14:28:56 sll Temporarily replaced all MARSHAL_MessageSizeExceedLimit with MARSHAL_MessageSizeExceedLimitOnServer. Revision 1.1.4.4 2001/05/01 17:56:29 sll Remove user exception check in sendUserException. This has been done by the caller. Revision 1.1.4.3 2001/05/01 17:15:17 sll Non-copy input now works correctly. Revision 1.1.4.2 2001/05/01 16:07:32 sll All GIOP implementations should now work with fragmentation and abitrary sizes non-copy transfer. Revision 1.1.4.1 2001/04/18 18:10:50 sll Big checkin with the brand new internal APIs.*/#include <omniORB4/CORBA.h>#include <giopStream.h>#include <giopStreamImpl.h>#include <giopStrand.h>#include <giopRope.h>#include <giopServer.h>#include <GIOP_S.h>#include <GIOP_C.h>#include <omniORB4/minorCode.h>#include <initialiser.h>#include <exceptiondefs.h>#include <omniORB4/callDescriptor.h>#include <omniORB4/omniInterceptors.h>#include <interceptors.h>#include <orbParameters.h>OMNI_NAMESPACE_BEGIN(omni)class nonexistence;class giopImpl12 {public: static void outputMessageBegin(giopStream*, void (*marshalHeader)(giopStream*)); static void outputMessageEnd(giopStream*); static void inputMessageBegin(giopStream*, void (*unmarshalHeader)(giopStream*)); static void inputMessageEnd(giopStream*,CORBA::Boolean disgard = 0); static void sendMsgErrorMessage(giopStream*); static void marshalRequestHeader(giopStream*); static void sendLocateRequest(giopStream*); static void unmarshalReplyHeader(giopStream*); static void unmarshalLocateReply(giopStream*); static void unmarshalWildCardRequestHeader(giopStream*); static void unmarshalRequestHeader(giopStream*); static void unmarshalLocateRequest(giopStream*); static void marshalReplyHeader(giopStream*); static void sendSystemException(giopStream*,const CORBA::SystemException&); static void sendUserException(giopStream*,const CORBA::UserException&); static void sendLocationForwardReply(giopStream*,CORBA::Object_ptr, CORBA::Boolean=0); static void sendLocateReply(giopStream*,GIOP::LocateStatusType, CORBA::Object_ptr,CORBA::SystemException* p = 0); static size_t inputRemaining(giopStream*); static void getInputData(giopStream*,omni::alignment_t,size_t); static void skipInputData(giopStream*,size_t); static void copyInputData(giopStream*,void*, size_t,omni::alignment_t); static size_t outputRemaining(const giopStream*); static void getReserveSpace(giopStream*,omni::alignment_t,size_t); static void copyOutputData(giopStream*,void*, size_t,omni::alignment_t); static CORBA::ULong currentInputPtr(const giopStream*); static CORBA::ULong currentOutputPtr(const giopStream*); friend class nonexistence; // Just to make gcc shut up. static void inputNewFragment(giopStream* g); static void inputNewServerMessage(giopStream* g); static void inputQueueMessage(giopStream*,giopStream_Buffer*); static void inputReplyBegin(giopStream*, void (*unmarshalHeader)(giopStream*)); static void inputSkipWholeMessage(giopStream* g); static void inputTerminalProtocolError(giopStream* g, const char* file, int line); // Helper function. Call this function to indicate that a protocol // voilation was detected. This function *always* raise a // giopStream::CommFailure exception. Therefore the caller should not // expect this function to return. static void inputRaiseCommFailure(giopStream* g); static void outputNewMessage(giopStream* g); static void outputFlush(giopStream* g,CORBA::Boolean knownFragmentSize=0); static void outputSetFragmentSize(giopStream*,CORBA::ULong); static CORBA::Boolean outputHasReachedLimit(giopStream* g);private: giopImpl12(); giopImpl12(const giopImpl12&); giopImpl12& operator=(const giopImpl12&);};////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputQueueMessage(giopStream* g,giopStream_Buffer* b) { // On entry, this function owns the giopStream_Buffer. On exit, the // buffer has either been assigned to another owner or deleted. unsigned char* hdr = (unsigned char*)b + b->start; GIOP::MsgType mtype = (GIOP::MsgType)hdr[7]; if (hdr[4] != 1 || hdr[5] != 2 || mtype > GIOP::Fragment) { // Notice that we only accept GIOP 1.2 packets here. That is, any // GIOP 1.0 or 1.1. messages will be rejected. // While the spec. say that a GIOP 1.2 message can interleave with // another 1.2 message. It does not say if a GIOP 1.0 or 1.1 message // can interleave with a GIOP 1.2 message. Our interpretation is to // disallow this. // // *** HERE: when we support GIOP > 1.2, we can get here (or // rather the equivalent place in GIOP 1.3+) due to receiving a // reply message on the receiver thread of a bidirectional // connection. // We accept a CloseConnection message with any GIOP version. if ((GIOP::MsgType)hdr[7] != GIOP::CloseConnection) { giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } } CORBA::ULong reqid; if ( mtype != GIOP::CloseConnection && mtype != GIOP::MessageError) { // unmarshal request id. reqid = *((CORBA::ULong*)(hdr + 12)); if ((hdr[6] & 0x1) != _OMNIORB_HOST_BYTE_ORDER_) { CORBA::ULong v = reqid; reqid = ((((v) & 0xff000000) >> 24) | (((v) & 0x00ff0000) >> 8) | (((v) & 0x0000ff00) << 8) | (((v) & 0x000000ff) << 24)); } } else if ( mtype == GIOP::MessageError) { giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } else if ( g->pd_strand->isClient() || g->pd_strand->biDir) { // orderly shutdown. CORBA::ULong minor; CORBA::Boolean retry; giopStream_Buffer::deleteBuffer(b); g->pd_strand->orderly_closed = 1; g->notifyCommFailure(0,minor,retry); g->pd_strand->state(giopStrand::DYING); giopStream::CommFailure::_raise(minor, CORBA::COMPLETED_NO, retry,__FILE__,__LINE__); // never reach here } else { giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } giopStream* matched_target = 0; CORBA::Boolean matched_target_is_client = 0; omniTransportLock->lock(); // Look at clients switch (mtype) { case GIOP::Reply: case GIOP::LocateReply: if (!(g->pd_strand->isClient() || g->pd_strand->biDir)) { omniTransportLock->unlock(); giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } // falls through case GIOP::Fragment: { // search for a matching id in clients. giopStreamList* gp = g->pd_strand->clients.next; for (; gp != &g->pd_strand->clients; gp = gp->next) { GIOP_C* target = (GIOP_C*)gp; if (target->state() != IOP_C::UnUsed && target->requestId() == reqid) { // sanity check if (target->inputFullyBuffered()) { // a reply has already been received! omniTransportLock->unlock(); giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } if (target->inputMatchedId()) { if (mtype != GIOP::Fragment) { // already got the header omniTransportLock->unlock(); giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } } else if (mtype == GIOP::Fragment) { // receive body before the header omniTransportLock->unlock(); giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } else { target->inputMatchedId(1); } matched_target = (giopStream*)target; matched_target_is_client = 1; break; } } } default: break; } if (!matched_target) { // look at servers switch (mtype) { case GIOP::Request: case GIOP::LocateRequest: { if (g->pd_strand->isClient() && !g->pd_strand->biDir) { omniTransportLock->unlock(); giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } // Make sure this request id has not been seen before GIOP_S* unused = 0; giopStreamList* gp = g->pd_strand->servers.next; for (; gp != &g->pd_strand->servers; gp = gp->next) { GIOP_S* target = (GIOP_S*)gp; if (target->state() == IOP_S::UnUsed) { unused = target; } else if (target->requestId() == reqid) { // already have a request with the same id. omniTransportLock->unlock(); giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } } if (!unused) { OMNIORB_ASSERT(g->pd_strand->servers.next != &g->pd_strand->servers); GIOP_S* copy = (GIOP_S*)g->pd_strand->servers.next; unused = new GIOP_S(*copy); unused->giopStreamList::insert(g->pd_strand->servers); } unused->state(IOP_S::InputPartiallyBuffered); unused->requestId(reqid); } // falls through case GIOP::CancelRequest: if (g->pd_strand->isClient() && !g->pd_strand->biDir) { omniTransportLock->unlock(); giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } // falls through case GIOP::Fragment: { // search for a matching id in servers. giopStreamList* gp = g->pd_strand->servers.next; for (; gp != &g->pd_strand->servers; gp = gp->next) { GIOP_S* target = (GIOP_S*)gp; if (target->state() != IOP_S::UnUsed && target->requestId() == reqid) { // sanity check if (target->inputFullyBuffered()) { // a reply has already been received! omniTransportLock->unlock(); giopStream_Buffer::deleteBuffer(b); inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } matched_target = (giopStream*)target; break; } } } default: break; } }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?