📄 giopstream.cc
字号:
// -*- Mode: C++; -*-// Package : omniORB// giopStream.cc Created on: 16/01/2001// Author : Sai Lai Lo (sll)//// Copyright (C) 2001 AT&T Laboratories Cambridge//// This file is part of the omniORB library//// The omniORB library is free software; you can redistribute it and/or// modify it under the terms of the GNU Library General Public// License as published by the Free Software Foundation; either// version 2 of the License, or (at your option) any later version.//// This library is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU// Library General Public License for more details.//// You should have received a copy of the GNU Library General Public// License along with this library; if not, write to the Free// Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA // 02111-1307, USA////// Description:// /* $Log: giopStream.cc,v $ Revision 1.1.4.30 2005/10/13 11:38:16 dgrisby Dump CloseConnection messages. Revision 1.1.4.29 2005/03/03 12:55:55 dgrisby Minor log output clean-up. Thanks Peter Klotz. Revision 1.1.4.28 2004/07/23 13:25:44 dgrisby New traceExceptions option. Revision 1.1.4.27 2003/09/25 13:47:44 dgrisby Log connection attempts. Revision 1.1.4.26 2003/07/30 14:25:28 dgrisby Increase direct send cut-off to 16k. Revision 1.1.4.25 2003/07/16 14:22:38 dgrisby Speed up oneway handling a little. More tracing for split messages. Revision 1.1.4.24 2003/01/28 13:37:06 dgrisby Send GIOP dumps to logger. Thanks Matej Kenda. Revision 1.1.4.23 2002/11/12 16:40:11 dgrisby Fix incorrect delete. Revision 1.1.4.22 2002/02/25 11:17:13 dpg1 Use tracedmutexes everywhere. Revision 1.1.4.21 2002/02/13 16:02:39 dpg1 Stability fixes thanks to Bastiaan Bakker, plus threading optimisations inspired by investigating Bastiaan's bug reports. Revision 1.1.4.20 2001/10/17 16:33:28 dpg1 New downcast mechanism for cdrStreams. Revision 1.1.4.19 2001/09/26 10:48:11 sll Fixed a bug which causes problems when, in a single recv(), the ORB read more than one GIOP messages into its buffer and the last of these messages is only partially read. Revision 1.1.4.18 2001/09/10 17:53:07 sll In inputMessage, if a strand is dying and has been orderly_closed, i.e. a GIOP CloseConnection has been received, set the retry flag in the CommFailure exception. Revision 1.1.4.17 2001/09/04 14:44:25 sll Added the boolean argument to notifyCommFailure to indicate if omniTransportLock is held by the caller. Make sure reference count rd_nwaiting and wr_nwaiting are correct when a timeout has occurred. Revision 1.1.4.16 2001/09/03 16:51:01 sll Added the deadline parameter and access functions. All member functions that previously had deadline arguments now use the per-object deadline implicitly. Revision 1.1.4.15 2001/08/06 15:51:28 sll In errorOnSend, make sure that the retry flag returns by notifyCommFailure is not overwritten if the send failed on TRANSIENT_ConnectFailed. Revision 1.1.4.14 2001/08/03 17:41:21 sll System exception minor code overhaul. When a system exeception is raised, a meaning minor code is provided. Revision 1.1.4.13 2001/07/31 16:20:29 sll New primitives to acquire read lock on a connection. Revision 1.1.4.12 2001/07/13 15:28:36 sll Added more tracing messages. Revision 1.1.4.11 2001/07/03 12:01:16 dpg1 Minor correction to log message for platforms without C++ bool. Revision 1.1.4.10 2001/06/20 18:35:17 sll Upper case send,recv,connect,shutdown to avoid silly substutition by macros defined in socket.h to rename these socket functions to something else. Revision 1.1.4.9 2001/05/11 14:25:53 sll Added operator for omniORB::logger to report system exception status and minor code. Revision 1.1.4.8 2001/05/01 17:15:17 sll Non-copy input now works correctly. Revision 1.1.4.7 2001/05/01 16:07:31 sll All GIOP implementations should now work with fragmentation and abitrary sizes non-copy transfer. Revision 1.1.4.6 2001/04/18 18:10:48 sll Big checkin with the brand new internal APIs. */#include <omniORB4/CORBA.h>#include <exceptiondefs.h>#include <giopStream.h>#include <giopStrand.h>#include <giopStreamImpl.h>#include <omniORB4/minorCode.h>#include <stdio.h>OMNI_NAMESPACE_BEGIN(omni)////////////////////////////////////////////////////////////////////////CORBA::ULong giopStream::directSendCutOff = 16384;CORBA::ULong giopStream::directReceiveCutOff = 1024;CORBA::ULong giopStream::bufferSize = 8192;////////////////////////////////////////////////////////////////////////giopStream::giopStream(giopStrand* strand) : pd_strand(strand), pd_rdlocked(0), pd_wrlocked(0), pd_impl(0), pd_deadline_secs(0), pd_deadline_nanosecs(0), pd_currentInputBuffer(0), pd_input(0), pd_inputFullyBuffered(0), pd_inputMatchedId(0), pd_inputExpectAnotherFragment(0), pd_inputFragmentToCome(0), pd_inputMessageSize(0), pd_currentOutputBuffer(0), pd_outputFragmentSize(0), pd_outputMessageSize(0), pd_request_id(0){}////////////////////////////////////////////////////////////////////////giopStream::~giopStream() { giopStream_Buffer* p = pd_input; while (p) { giopStream_Buffer* q = p->next; giopStream_Buffer::deleteBuffer(p); p = q; } pd_input = 0; if (pd_currentInputBuffer) { giopStream_Buffer::deleteBuffer(pd_currentInputBuffer); pd_currentInputBuffer = 0; } if (pd_currentOutputBuffer) { giopStream_Buffer::deleteBuffer(pd_currentOutputBuffer); pd_currentOutputBuffer = 0; }}////////////////////////////////////////////////////////////////////////void*giopStream::ptrToClass(int* cptr){ if (cptr == &giopStream::_classid) return (giopStream*)this; if (cptr == &cdrStream ::_classid) return (cdrStream*) this; return 0;}int giopStream::_classid;////////////////////////////////////////////////////////////////////////voidgiopStream::reset() { giopStream_Buffer* p = pd_input; while (p) { giopStream_Buffer* q = p->next; giopStream_Buffer::deleteBuffer(p); p = q; } pd_input = 0; if (pd_currentInputBuffer) { giopStream_Buffer::deleteBuffer(pd_currentInputBuffer); pd_currentInputBuffer = 0; } inputFullyBuffered(0); inputMatchedId(0); setDeadline(0,0);}////////////////////////////////////////////////////////////////////////GIOP::VersiongiopStream::version() { return pd_impl->version();}////////////////////////////////////////////////////////////////////////voidgiopStream::rdLock() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); OMNIORB_ASSERT(!pd_rdlocked); while (pd_strand->rd_nwaiting < 0) { pd_strand->rd_nwaiting--; CORBA::Boolean hastimeout = 0; // Now blocks. if (!(pd_deadline_secs || pd_deadline_nanosecs)) pd_strand->rdcond.wait(); else { hastimeout = !(pd_strand->rdcond.timedwait(pd_deadline_secs, pd_deadline_nanosecs)); } if (pd_strand->rd_nwaiting >= 0) pd_strand->rd_nwaiting--; else pd_strand->rd_nwaiting++; if (hastimeout) { // Timeout. errorOnReceive(0,__FILE__,__LINE__,0,1); } } pd_strand->rd_nwaiting = -pd_strand->rd_nwaiting - 1; pd_rdlocked = 1;}////////////////////////////////////////////////////////////////////////voidgiopStream::rdUnLock() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); if (!pd_rdlocked) return; OMNIORB_ASSERT(pd_strand->rd_nwaiting < 0); pd_strand->rd_nwaiting = -pd_strand->rd_nwaiting - 1; if (pd_strand->rd_nwaiting > 0) { if (pd_strand->rd_n_justwaiting == 0) { pd_strand->rdcond.signal(); } else { // There are threads which have blocked on the read lock's condition // variable and do not try to acquire the read lock. If we signal // just one thread, the thread got woken up may just be of this type. // The other threads which do want the read lock may not get // it. The end result is no thread gets the read lock and the threads // that want it are still blocked in the condition variable. // To prevent this from happening, we wake all threads up. wakeUpRdLock(pd_strand); } } pd_rdlocked = 0;}////////////////////////////////////////////////////////////////////////voidgiopStream::wrLock() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); OMNIORB_ASSERT(!pd_wrlocked); while (pd_strand->wr_nwaiting < 0) { pd_strand->wr_nwaiting--; CORBA::Boolean hastimeout = 0; // Now blocks. if (!(pd_deadline_secs || pd_deadline_nanosecs)) pd_strand->wrcond.wait(); else { hastimeout = !(pd_strand->wrcond.timedwait(pd_deadline_secs, pd_deadline_nanosecs)); } if (pd_strand->wr_nwaiting >= 0) pd_strand->wr_nwaiting--; else pd_strand->wr_nwaiting++; if (hastimeout) { // Timeout. errorOnReceive(0,__FILE__,__LINE__,0,1); } } pd_strand->wr_nwaiting = -pd_strand->wr_nwaiting - 1; pd_wrlocked = 1;}////////////////////////////////////////////////////////////////////////voidgiopStream::wrUnLock() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); if (!pd_wrlocked) return; OMNIORB_ASSERT(pd_strand->wr_nwaiting < 0); pd_strand->wr_nwaiting = -pd_strand->wr_nwaiting - 1; if (pd_strand->wr_nwaiting > 0) pd_strand->wrcond.signal(); pd_wrlocked = 0; if ( pd_strand->state() == giopStrand::DYING && pd_strand->rd_n_justwaiting ) { // There are threads just blocking on the read lock's condition // variable and do not try to acquire the read lock. They may never // have a chance to discover that the strand has in fact died. wakeUpRdLock(pd_strand); }}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopStream::rdLockNonBlocking() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); OMNIORB_ASSERT(!pd_rdlocked); if (pd_strand->rd_nwaiting < 0) return 0; else { pd_strand->rd_nwaiting = -pd_strand->rd_nwaiting - 1; pd_rdlocked = 1; return 1; }}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopStream::rdLockNonBlocking(giopStrand* strand) { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); if (strand->rd_nwaiting < 0) return 0; else { strand->rd_nwaiting = -strand->rd_nwaiting - 1; return 1; }}////////////////////////////////////////////////////////////////////////voidgiopStream::sleepOnRdLock() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); if (pd_strand->rd_nwaiting < 0) { pd_strand->rd_nwaiting--; CORBA::Boolean hastimeout = 0; // Now blocks. if (!(pd_deadline_secs || pd_deadline_nanosecs)) pd_strand->rdcond.wait(); else { hastimeout = !(pd_strand->rdcond.timedwait(pd_deadline_secs, pd_deadline_nanosecs)); } if (pd_strand->rd_nwaiting >= 0) pd_strand->rd_nwaiting--; else pd_strand->rd_nwaiting++; if (hastimeout) { // Timeout. errorOnReceive(0,__FILE__,__LINE__,0,1); } }}////////////////////////////////////////////////////////////////////////voidgiopStream::sleepOnRdLock(giopStrand* strand) { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); if (strand->rd_nwaiting < 0) { strand->rd_nwaiting--; strand->rdcond.wait(); if (strand->rd_nwaiting >= 0) strand->rd_nwaiting--; else strand->rd_nwaiting++;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -