📄 giopimpl11.cc
字号:
(omni::ptr_arith_t) g->pd_inb_mkr; if (avail >= sz) { avail = sz; } if (avail) { if (b) memcpy(b,g->pd_inb_mkr,avail); g->pd_inb_mkr = (void*)((omni::ptr_arith_t)g->pd_inb_mkr + avail); } sz -= avail; if (b) b = (void*)((omni::ptr_arith_t)b + avail); if (!sz) break; if (g->pd_inb_mkr == g->pd_inb_end) { if (g->inputFragmentToCome() < sz && !g->inputExpectAnotherFragment()) { if (!g->inputMatchedId()) { // Because this error occurs when the id of the reply has not // been established. We have to treat this as an error on the // connection. Other threads that are reading from this connection // will notice this as well. g->pd_strand->state(giopStrand::DYING); } OMNIORB_THROW(MARSHAL,MARSHAL_PassEndOfMessage, (CORBA::CompletionStatus)g->completion()); // never reach here } if (!g->inputFragmentToCome()) { inputNewFragment(g); if (g->inputMessageSize() > orbParameters::giopMaxMsgSize) { OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimit, (CORBA::CompletionStatus)g->completion()); } continue; } if (g->inputMatchedId()) { if (g->pd_currentInputBuffer) { g->releaseInputBuffer(g->pd_currentInputBuffer); g->pd_currentInputBuffer = 0; } if (g->pd_input) { g->pd_currentInputBuffer = g->pd_input; g->pd_input = g->pd_currentInputBuffer->next; g->pd_currentInputBuffer->next = 0; } else { if ( b && sz >= giopStream::directReceiveCutOff ) { CORBA::ULong transz = g->inputFragmentToCome(); if (transz > sz) transz = sz; transz = (transz >> 3) << 3; g->inputCopyChunk(b,transz); sz -= transz; b = (void*)((omni::ptr_arith_t)b + transz); g->inputFragmentToCome(g->inputFragmentToCome() - transz); continue; } else { g->pd_currentInputBuffer = g->inputChunk(g->inputFragmentToCome()); } } } else { // We keep the buffer around until the id of the reply is established. if (g->pd_currentInputBuffer) { giopStream_Buffer** pp = &g->pd_input; while (*pp) { pp = &((*pp)->next); } *pp = g->pd_currentInputBuffer; g->pd_currentInputBuffer = 0; g->pd_currentInputBuffer = g->inputChunk(g->inputFragmentToCome()); } } g->pd_inb_mkr = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start); g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + g->pd_currentInputBuffer->last); g->inputFragmentToCome(g->inputFragmentToCome() - (g->pd_currentInputBuffer->last - g->pd_currentInputBuffer->start)); } } }////////////////////////////////////////////////////////////////////////CORBA::ULonggiopImpl11::currentInputPtr(const giopStream* g) { return g->inputMessageSize() - g->inputFragmentToCome() - ((omni::ptr_arith_t) g->pd_inb_end - (omni::ptr_arith_t) g->pd_inb_mkr);}////////////////////////////////////////////////////////////////////////voidgiopImpl11::inputTerminalProtocolError(giopStream* g) { // XXX We may choose to send a message error to the other end. if (omniORB::trace(1)) { omniORB::logger l; l << "From endpoint: " << g->pd_strand->connection->peeraddress() <<". Detected GIOP 1.1 protocol error in input message. " << "Connection is closed.\n"; } inputRaiseCommFailure(g);}////////////////////////////////////////////////////////////////////////voidgiopImpl11::inputRaiseCommFailure(giopStream* g) { CORBA::ULong minor; CORBA::Boolean retry; g->notifyCommFailure(0,minor,retry); g->pd_strand->state(giopStrand::DYING); giopStream::CommFailure::_raise(minor, (CORBA::CompletionStatus)g->completion(), 0,__FILE__,__LINE__);}////////////////////////////////////////////////////////////////////////voidgiopImpl11::outputNewMessage(giopStream* g) { if (!g->pd_wrlocked) { omni_tracedmutex_lock sync(*omniTransportLock); g->wrLock(); } if (!g->pd_currentOutputBuffer) { g->pd_currentOutputBuffer = giopStream_Buffer::newBuffer(); } g->pd_currentOutputBuffer->alignStart(omni::ALIGN_8); char* hdr = (char*)g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start; hdr[0] = 'G'; hdr[1] = 'I'; hdr[2] = 'O'; hdr[3] = 'P'; hdr[4] = 1; hdr[5] = 1; hdr[6] = _OMNIORB_HOST_BYTE_ORDER_; g->pd_outb_mkr = (void*)(hdr + 12); g->pd_outb_end = (void*)((omni::ptr_arith_t)g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->end); g->outputFragmentSize(0); g->outputMessageSize(0);}////////////////////////////////////////////////////////////////////////voidgiopImpl11::outputMessageBegin(giopStream* g, void (*marshalHeader)(giopStream*)) { outputNewMessage(g); marshalHeader(g);}////////////////////////////////////////////////////////////////////////voidgiopImpl11::outputMessageEnd(giopStream* g) { if (g->pd_currentOutputBuffer) { omni::ptr_arith_t outbuf_begin = ((omni::ptr_arith_t) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start); if ( outbuf_begin != (omni::ptr_arith_t)g->pd_outb_mkr ) { if (!g->outputFragmentSize()) { CORBA::ULong sz = (omni::ptr_arith_t)g->pd_outb_mkr - outbuf_begin -12; *((CORBA::ULong*)(outbuf_begin + 8)) = sz; g->outputMessageSize(g->outputMessageSize()+sz); } g->pd_currentOutputBuffer->last = (omni::ptr_arith_t) g->pd_outb_mkr - (omni::ptr_arith_t) g->pd_currentOutputBuffer; g->sendChunk(g->pd_currentOutputBuffer); } // Notice that we do not release the buffer. Next time this giopStream // is re-used, the buffer will be reused as well. } { omni_tracedmutex_lock sync(*omniTransportLock); g->wrUnLock(); }}////////////////////////////////////////////////////////////////////////voidgiopImpl11::sendMsgErrorMessage(giopStream* g) { if (!g->pd_wrlocked) { omni_tracedmutex_lock sync(*omniTransportLock); g->wrLock(); } if (omniORB::trace(1)) { omniORB::logger l; l << "To endpoint: " << g->pd_strand->connection->peeraddress() <<". Send GIOP 1.1 MessageError because a protocol error has been detected. " << "Connection is closed.\n"; } if (!g->pd_currentOutputBuffer) { g->pd_currentOutputBuffer = giopStream_Buffer::newBuffer(); } g->pd_currentOutputBuffer->alignStart(omni::ALIGN_8); char* hdr = (char*)g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start; hdr[0] = 'G'; hdr[1] = 'I'; hdr[2] = 'O'; hdr[3] = 'P'; hdr[4] = 1; hdr[5] = 1; hdr[6] = _OMNIORB_HOST_BYTE_ORDER_; g->pd_outb_mkr = (void*)(hdr + 12); g->pd_outb_end = (void*)((omni::ptr_arith_t)g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->end); g->outputFragmentSize(0); g->outputMessageSize(0); hdr[7] = (char)GIOP::MessageError; hdr[8] = hdr[9] = hdr[10] = hdr[11] = 0; (void) g->pd_strand->connection->Send(hdr,12); g->pd_strand->state(giopStrand::DYING); { omni_tracedmutex_lock sync(*omniTransportLock); g->wrUnLock(); }}////////////////////////////////////////////////////////////////////////voidgiopImpl11::marshalRequestHeader(giopStream* g) { char* hdr = (char*) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start; hdr[7] = (char) GIOP::Request; GIOP_C& giop_c = *(GIOP_C*)g; cdrStream& s = (cdrStream&) *g; omniCallDescriptor& calldesc = *giop_c.calldescriptor(); CORBA::Boolean response_expected = !calldesc.is_oneway(); omniInterceptors::clientSendRequest_T::info_T info(*g, *(giop_c.ior()), calldesc.op(), !response_expected, response_expected); omniInterceptorP::visit(info); // service context info.service_contexts >>= s; // request id giop_c.requestId() >>= s; // response expected flag s.marshalBoolean(response_expected); // object key giop_c.keysize() >>= s; s.put_octet_array(giop_c.key(),giop_c.keysize()); // operation operator>>= ((CORBA::ULong) calldesc.op_len(),s); s.put_octet_array((CORBA::Octet*) calldesc.op(), calldesc.op_len()); // principal omni::myPrincipalID >>= s;}////////////////////////////////////////////////////////////////////////voidgiopImpl11::sendLocateRequest(giopStream* g) { outputNewMessage(g); char* hdr = (char*) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start; hdr[7] = (char)GIOP::LocateRequest; GIOP_C& giop_c = *(GIOP_C*)g; cdrStream& s = (cdrStream&) *g; // Compute and initialise the message size field { cdrCountingStream cs(g->TCS_C(),g->TCS_W(),12); operator>>= ((CORBA::ULong)0,cs); giop_c.keysize() >>= cs; cs.put_octet_array(giop_c.key(),giop_c.keysize()); outputSetFragmentSize(g,cs.total()-12); *((CORBA::ULong*)(hdr + 8)) = cs.total() - 12; } // request id giop_c.requestId() >>= s; // object key giop_c.keysize() >>= s; s.put_octet_array(giop_c.key(),giop_c.keysize()); outputMessageEnd(g);}////////////////////////////////////////////////////////////////////////voidgiopImpl11::marshalReplyHeader(giopStream* g) { char* hdr = (char*) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start; hdr[7] = (char) GIOP::Reply; GIOP_S& giop_s = *(GIOP_S*)g; cdrStream& s = (cdrStream&) *g; CORBA::ULong rc = GIOP::NO_EXCEPTION; { // calculate the reply header size cdrCountingStream cs(g->TCS_C(),g->TCS_W(),12); giop_s.service_contexts() >>= cs; operator>>= ((CORBA::ULong)0,cs); rc >>= cs; *((CORBA::ULong*)(hdr+8)) = cs.total(); } // Service context operator>>= ((CORBA::ULong)0,s); // request id giop_s.requestId() >>= s; // reply status rc >>= s;}////////////////////////////////////////////////////////////////////////voidgiopImpl11::sendSystemException(giopStream* g,const CORBA::SystemException& ex) { GIOP_S& giop_s = *(GIOP_S*)g; cdrStream& s = (cdrStream&) *g; if (giop_s.state() == GIOP_S::ReplyIsBeingComposed) { // This system exception is raised during the marshalling of the reply. // We cannot marshal the exception. Can only indicate that something // fatal about this request. sendMsgErrorMessage(g); CORBA::ULong minor; CORBA::Boolean retry; giop_s.notifyCommFailure(0,minor,retry); giopStream::CommFailure::_raise(minor,(CORBA::CompletionStatus) giop_s.completion(), retry,__FILE__,__LINE__); } int repoid_size; const char* repoid = ex._NP_repoId(&repoid_size); outputNewMessage(g); char* hdr = (char*) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start; hdr[7] = (char) GIOP::Reply; giop_s.service_contexts().length(0); if (omniInterceptorP::serverSendException) { omniInterceptors::serverSendException_T::info_T info(giop_s, &ex); omniInterceptorP::visit(info); if (giop_s.service_contexts().length() > 0) { // Compute and initialise the message size field. Only necessary // if there are service contexts, since we know a message without // service contexts will fit in a single buffer. cdrCountingStream cs(g->TCS_C(),g->TCS_W(),12); giop_s.service_contexts() >>= cs; operator>>= ((CORBA::ULong)0,cs); operator>>= ((CORBA::ULong)0,cs); CORBA::ULong(repoid_size) >>= cs; cs.put_octet_array((const CORBA::Octet*) repoid, repoid_size); ex.minor() >>= cs; operator>>= ((CORBA::ULong)0,cs); outputSetFragmentSize(g,cs.total()-12); *((CORBA::ULong*)(hdr + 8)) = cs.total() - 12; } } // Service context giop_s.service_contexts() >>= s; // request id giop_s.requestId() >>= s; // reply status CORBA::ULong rc = GIOP::SYSTEM_EXCEPTION; rc >>= s; // system exception value CORBA::ULong(repoid_size) >>= s; s.put_octet_array((const CORBA::Octet*) repoid, repoid_size); ex.minor() >>= s; CORBA::ULong(ex.completed()) >>= s; outputMessageEnd(g);}////////////////////////////////////////////////////////////////////////voidgiopImpl11::sendUserException(giopStream* g,const CORBA::UserException& ex) { GIOP_S& giop_s = *(GIOP_S*)g; cdrStream& s = (cdrStream&) *g; int i, repoid_size; const char* repoid = ex._NP_repoId(&repoid_size); outputNewMessage(g); char* hdr = (char*)g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start; hdr[7] = (char)GIOP::Reply; giop_s.service_contexts().length(0); if (omniInterceptorP::serverSendException) { omniInterceptors::serverSendException_T::info_T info(giop_s, &ex); omniInterceptorP::visit(info); } // user exception value
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -