📄 giopstream.cc
字号:
}}////////////////////////////////////////////////////////////////////////voidgiopStream::sleepOnRdLockAlways() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); if (pd_strand->rd_nwaiting < 0) pd_strand->rd_nwaiting--; else pd_strand->rd_nwaiting++; pd_strand->rd_n_justwaiting++; CORBA::Boolean hastimeout = 0; // Now blocks. if (!(pd_deadline_secs || pd_deadline_nanosecs)) pd_strand->rdcond.wait(); else { hastimeout = !(pd_strand->rdcond.timedwait(pd_deadline_secs, pd_deadline_nanosecs)); } pd_strand->rd_n_justwaiting--; if (pd_strand->rd_nwaiting >= 0) pd_strand->rd_nwaiting--; else pd_strand->rd_nwaiting++; if (hastimeout) { // Timeout. errorOnReceive(0,__FILE__,__LINE__,0,1); }}////////////////////////////////////////////////////////////////////////voidgiopStream::wakeUpRdLock(giopStrand* strand) { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1);#if 1 strand->rdcond.broadcast();#else // Do this if the platform's condition variable broadcast does not // work. int i = strand->rd_nwaiting; if (i < 0) i = -1; for ( ; i > 0; i--) strand->rdcond.signal();#endif}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopStream::noLockWaiting(giopStrand* strand) { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); return ((strand->rd_nwaiting == 0) && (strand->wr_nwaiting == 0));}////////////////////////////////////////////////////////////////////////voidgiopStream::markRdLock() { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); OMNIORB_ASSERT(pd_rdlocked == 0); pd_rdlocked = 1;}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopStream::RdLockIsHeld(giopStrand* strand) { ASSERT_OMNI_TRACEDMUTEX_HELD(*omniTransportLock,1); return ((strand->rd_nwaiting != 0));}////////////////////////////////////////////////////////////////////////voidgiopStream::notifyCommFailure(CORBA::Boolean, CORBA::ULong& minor, CORBA::Boolean& retry){ minor = 0; retry = 0;}////////////////////////////////////////////////////////////////////////voidgiopStream::CommFailure::_raise(CORBA::ULong minor, CORBA::CompletionStatus status, CORBA::Boolean retry, const char* filename, CORBA::ULong linenumber){ if (status != CORBA::COMPLETED_NO) retry = 0;#ifndef OMNIORB_NO_EXCEPTION_LOGGING if( omniORB::traceExceptions ) { omniORB::logger l; l << "throw giopStream::CommFailure from " << omniExHelper::strip(filename) << ":" << linenumber << "(" << (int)retry << ","; const char* description = minorCode2String(TRANSIENT_LookupTable,minor); if (!description) description = minorCode2String(COMM_FAILURE_LookupTable,minor); if (description) l << omniORB::logger::exceptionStatus(status,description); else l << omniORB::logger::exceptionStatus(status,minor); l << ")\n"; }#endif throw CommFailure(minor,status,retry,filename,linenumber);}////////////////////////////////////////////////////////////////////////voidgiopStream::get_octet_array(CORBA::Octet* b,int size,omni::alignment_t align) { OMNIORB_ASSERT(impl()); impl()->copyInputData(this,b,size,align);}////////////////////////////////////////////////////////////////////////voidgiopStream::skipInput(CORBA::ULong size) { OMNIORB_ASSERT(impl()); impl()->skipInputData(this,size);}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopStream::checkInputOverrun(CORBA::ULong itemSize, CORBA::ULong nItems, omni::alignment_t align) { OMNIORB_ASSERT(impl()); size_t avail = impl()->inputRemaining(this); omni::ptr_arith_t p1 = omni::align_to((omni::ptr_arith_t)pd_inb_mkr,align); p1 += itemSize*nItems; if (avail < (size_t)(p1 - (omni::ptr_arith_t)pd_inb_mkr)) return 0; else return 1;}////////////////////////////////////////////////////////////////////////voidgiopStream::fetchInputData(omni::alignment_t align,size_t required) { OMNIORB_ASSERT(impl()); OMNIORB_ASSERT(required == 0 || (required <= 8 && ((size_t)align == required))); impl()->getInputData(this,align,required);}////////////////////////////////////////////////////////////////////////voidgiopStream::put_octet_array(const CORBA::Octet* b, int size, omni::alignment_t align) { OMNIORB_ASSERT(impl()); impl()->copyOutputData(this,(void*)b,size,align);}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopStream::checkOutputOverrun(CORBA::ULong itemSize, CORBA::ULong nItems, omni::alignment_t align) { OMNIORB_ASSERT(impl()); size_t avail = impl()->outputRemaining(this); if (avail != ULONG_MAX) { omni::ptr_arith_t p1=omni::align_to((omni::ptr_arith_t)pd_outb_mkr,align); p1 += itemSize*nItems; if (avail < (size_t)(p1 - (omni::ptr_arith_t)pd_outb_mkr)) return 0; } return 1;}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopStream::reserveOutputSpaceForPrimitiveType(omni::alignment_t align, size_t required) { OMNIORB_ASSERT(impl()); OMNIORB_ASSERT(required == 0 || (required <= 8 && ((size_t)align == required))); impl()->getReserveSpace(this,align,required); // Either we have the space or we've thrown an exception in getReserveSpace. return 1;}////////////////////////////////////////////////////////////////////////CORBA::BooleangiopStream::maybeReserveOutputSpace(omni::alignment_t align, size_t required) { OMNIORB_ASSERT(impl()); if (required > 8 || ((size_t)align != required)) { OMNIORB_THROW(BAD_PARAM,BAD_PARAM_InternalInvariant, (CORBA::CompletionStatus)completion()); } impl()->getReserveSpace(this,align,required); // Either we have the space or we've thrown an exception in getReserveSpace. return 1;}////////////////////////////////////////////////////////////////////////CORBA::ULonggiopStream::currentInputPtr() const { OMNIORB_ASSERT(impl()); return impl()->currentInputPtr(this);}////////////////////////////////////////////////////////////////////////CORBA::ULonggiopStream::currentOutputPtr() const { OMNIORB_ASSERT(impl()); return impl()->currentOutputPtr(this);}////////////////////////////////////////////////////////////////////////giopStream_Buffer* giopStream_Buffer::newBuffer(CORBA::ULong sz) { if (!sz) sz = giopStream::bufferSize; // giopStream_Buffer* b = (giopStream_Buffer*) // (new char[sz + sizeof(giopStream_Buffer) + 8]); char* p = new char[sz + sizeof(giopStream_Buffer) + 8]; giopStream_Buffer* b = (giopStream_Buffer*)p; b->alignStart(omni::ALIGN_8); b->end = b->start + sz; b->last = b->start; b->size = 0; b->next = 0; return b;}////////////////////////////////////////////////////////////////////////void giopStream_Buffer::deleteBuffer(giopStream_Buffer* b) { char* p = (char*)b; delete [] p;}////////////////////////////////////////////////////////////////////////void giopStream_Buffer::alignStart(omni::alignment_t align) { omni::ptr_arith_t p = omni::align_to((omni::ptr_arith_t)this + sizeof(giopStream_Buffer),align); start = p - (omni::ptr_arith_t) this;}////////////////////////////////////////////////////////////////////////voidgiopStream::releaseInputBuffer(giopStream_Buffer* p) { if (!pd_rdlocked || pd_strand->spare || (p->end - p->start) < giopStream::bufferSize ) { char* c = (char*)p; delete [] c; return; } p->next = pd_strand->spare; pd_strand->spare = p;}////////////////////////////////////////////////////////////////////////voidgiopStream::errorOnReceive(int rc, const char* filename, CORBA::ULong lineno, giopStream_Buffer* buf,CORBA::Boolean heldlock) { CORBA::ULong minor; CORBA::Boolean retry; notifyCommFailure(heldlock,minor,retry); if (rc == 0) { // Timeout. // We do not use the return code from the function. if (buf && buf->end != buf->start) { // partially received a buffer, we assume the other end // is in serious trouble. pd_strand->state(giopStrand::DYING); } retry = 0; minor = TRANSIENT_CallTimedout; } else { pd_strand->state(giopStrand::DYING); } if (buf) giopStream_Buffer::deleteBuffer(buf); CommFailure::_raise(minor,(CORBA::CompletionStatus)completion(),retry, filename,lineno); // never reaches here.}////////////////////////////////////////////////////////////////////////CORBA::ULonggiopStream::ensureSaneHeader(const char* filename, CORBA::ULong lineno, giopStream_Buffer* buf, CORBA::ULong begin) { CORBA::ULong minor; CORBA::Boolean retry; char* hdr = (char*)buf + begin; if (hdr[0] != 'G' || hdr[1] != 'I' || hdr[2] != 'O' || hdr[3] != 'P') { // Terrible! This is not a GIOP header. pd_strand->state(giopStrand::DYING); notifyCommFailure(0,minor,retry); giopStream_Buffer::deleteBuffer(buf); CommFailure::_raise(minor,(CORBA::CompletionStatus)completion(),retry, filename,lineno); // never reaches here. } // Get the message size from the buffer CORBA::ULong msz; // check for 8 byte alignment if (((long)hdr & 7) == 0) msz = *(CORBA::ULong*)(hdr + 8); else memcpy(&msz, hdr + 8, sizeof(CORBA::ULong)); if ((hdr[6] & 0x1) != _OMNIORB_HOST_BYTE_ORDER_) { CORBA::ULong bsz = msz; msz = ((((bsz) & 0xff000000) >> 24) | (((bsz) & 0x00ff0000) >> 8) | (((bsz) & 0x0000ff00) << 8) | (((bsz) & 0x000000ff) << 24)); } return msz + 12;}////////////////////////////////////////////////////////////////////////giopStream_Buffer*giopStream::inputMessage() { OMNIORB_ASSERT(pd_rdlocked); if (pd_strand->state() == giopStrand::DYING) { CORBA::ULong minor; CORBA::Boolean retry; notifyCommFailure(0,minor,retry); CORBA::CompletionStatus status; if (pd_strand->orderly_closed) { status = CORBA::COMPLETED_NO; } else { status = (CORBA::CompletionStatus)completion(); } CommFailure::_raise(minor,status,retry,__FILE__,__LINE__); // never reaches here. } giopStream_Buffer* buf; if (pd_strand->head) { buf = pd_strand->head; pd_strand->head = buf->next; buf->next = 0; } else if (pd_strand->spare) { buf = pd_strand->spare; pd_strand->spare = buf->next; buf->next = 0; buf->last = buf->start; } else { buf = giopStream_Buffer::newBuffer(); } while ((buf->last - buf->start) < 12) { int rsz = pd_strand->connection->Recv((void*) ((omni::ptr_arith_t)buf+buf->last), (size_t) (buf->end - buf->last), pd_deadline_secs, pd_deadline_nanosecs); if (rsz > 0) { buf->last += rsz; } else { errorOnReceive(rsz,__FILE__,__LINE__,buf,0); // never reaches here.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -