giopimpl12.cc
来自「编译工具」· CC 代码 · 共 2,182 行 · 第 1/5 页
CC
2,182 行
omniTransportLock->unlock(); if (matched_target) { if (matched_target == g) { OMNIORB_ASSERT(g->pd_input == 0); if (g->pd_currentInputBuffer) { g->releaseInputBuffer(g->pd_currentInputBuffer); g->pd_currentInputBuffer = 0; } g->pd_input = b; return; } OMNIORB_ASSERT(matched_target->pd_currentInputBuffer == 0); giopStream_Buffer** pp = &matched_target->pd_input; while (*pp) { pp = &((*pp)->next); } *pp = b; CORBA::ULong fetchsz = b->size - (b->last - b->start); while (fetchsz) { // fetch the rest of the message; giopStream_Buffer* p = g->inputChunk(fetchsz); pp = &((*pp)->next); *pp = p; fetchsz -= (p->last - p->start); } CORBA::Boolean isfull = ((hdr[6] & 0x2) ? 0 : 1); if (mtype == GIOP::CancelRequest) isfull = 1; if (isfull) { omni_tracedmutex_lock sync(*omniTransportLock); matched_target->inputFullyBuffered(isfull); if (!matched_target_is_client) { ((GIOP_S*)matched_target)->state(IOP_S::InputFullyBuffered); omniORB::logs(40, "Changed GIOP_S to InputFullyBuffered"); if (!g->pd_strand->isClient()) { g->pd_strand->server->notifyCallFullyBuffered(g->pd_strand->connection); } } giopStream::wakeUpRdLock(g->pd_strand); } return; } // reach here if we have no match. { CORBA::ULong fetchsz = b->size - (b->last - b->start); giopStream_Buffer::deleteBuffer(b); while (fetchsz) { // fetch the rest of the message; b = g->inputChunk(fetchsz); fetchsz -= (b->last - b->start); giopStream_Buffer::deleteBuffer(b); } }}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputNewServerMessage(giopStream* g) { OMNIORB_ASSERT(g->pd_currentInputBuffer == 0); g->pd_currentInputBuffer = g->inputMessage(); unsigned char* hdr = (unsigned char*)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start; if (hdr[4] != 1 || hdr[5] > 2 || hdr[7] > (unsigned char) GIOP::Fragment) { inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } switch ((GIOP::MsgType)hdr[7]) { case GIOP::Request: case GIOP::LocateRequest: case GIOP::MessageError: case GIOP::CloseConnection: return; case GIOP::Reply: case GIOP::LocateReply: if (g->pd_strand->biDir) { break; } else { inputTerminalProtocolError(g, __FILE__, __LINE__); // Never reach here. } case GIOP::Fragment: case GIOP::CancelRequest: break; } // reach here if the message is destined to some other call in progress. giopStream_Buffer* p = g->pd_currentInputBuffer; g->pd_currentInputBuffer = 0; inputQueueMessage(g,p);}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputNewFragment(giopStream* g) { if (g->pd_currentInputBuffer) { g->releaseInputBuffer(g->pd_currentInputBuffer); g->pd_currentInputBuffer = 0; } again: if (!g->pd_input) { giopStream_Buffer* p = g->inputMessage(); inputQueueMessage(g,p); goto again; } else { g->pd_currentInputBuffer = g->pd_input; g->pd_input = g->pd_currentInputBuffer->next; g->pd_currentInputBuffer->next = 0; } char* hdr = (char*)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start; if (hdr[7] == GIOP::CancelRequest) { if (g->pd_strand->biDir || !g->pd_strand->isClient()) { throw GIOP_S::terminateProcessing(); } else { inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here. } } CORBA::Boolean bswap = (((hdr[6] & 0x1) == _OMNIORB_HOST_BYTE_ORDER_) ? 0 : 1 ); if (bswap != g->pd_unmarshal_byte_swap) { inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here } g->pd_inb_mkr = (void*)(hdr + 16); g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + g->pd_currentInputBuffer->last); g->inputExpectAnotherFragment(((hdr[6] & 0x2) ? 1 : 0)); g->inputMessageSize(g->inputMessageSize() + g->pd_currentInputBuffer->size - 16); g->inputFragmentToCome(g->pd_currentInputBuffer->size - (g->pd_currentInputBuffer->last - g->pd_currentInputBuffer->start));}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputReplyBegin(giopStream* g, void (*unmarshalHeader)(giopStream*)) { { omni_tracedmutex_lock sync(*omniTransportLock); if (!g->pd_strand->biDir) { while (!(g->inputFullyBuffered() || g->pd_rdlocked)) { if (!g->rdLockNonBlocking()) { g->sleepOnRdLock(); } } } else { // If the strand is used for bidirectional GIOP, we // let the server thread to read the incoming message and to // demultiplex any reply back to us. Therefore, we do not // try to acquire the read lock but just sleep on the read // lock instead. while (!(g->pd_strand->state() == giopStrand::DYING || g->inputFullyBuffered() ) ) { OMNIORB_ASSERT(g->pd_rdlocked == 0); g->sleepOnRdLockAlways(); } if (g->pd_strand->state() == giopStrand::DYING) { CORBA::ULong minor; CORBA::Boolean retry; g->notifyCommFailure(1,minor,retry); CORBA::CompletionStatus status; if (g->pd_strand->orderly_closed) { status = CORBA::COMPLETED_NO; } else { status = (CORBA::CompletionStatus)g->completion(); } giopStream::CommFailure::_raise(minor, status, retry, __FILE__,__LINE__); // never reaches here. } } } if (!g->pd_currentInputBuffer) { again: if (!g->pd_input) { giopStream_Buffer* p = g->inputMessage(); inputQueueMessage(g,p); goto again; } else { g->pd_currentInputBuffer = g->pd_input; g->pd_input = g->pd_input->next; g->pd_currentInputBuffer->next = 0; } } char* hdr = (char*)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start; g->pd_unmarshal_byte_swap = (((hdr[6] & 0x1) == _OMNIORB_HOST_BYTE_ORDER_) ? 0 : 1 ); g->pd_inb_mkr = (void*)(hdr + 16); g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + g->pd_currentInputBuffer->last); g->inputExpectAnotherFragment(((hdr[6] & 0x2) ? 1 : 0)); g->inputMessageSize(g->pd_currentInputBuffer->size); g->inputFragmentToCome(g->pd_currentInputBuffer->size - (g->pd_currentInputBuffer->last - g->pd_currentInputBuffer->start)); unmarshalHeader(g); if (g->inputMessageSize() > orbParameters::giopMaxMsgSize) { OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnClient, CORBA::COMPLETED_YES); }}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputMessageBegin(giopStream* g, void (*unmarshalHeader)(giopStream*)) { if (unmarshalHeader != unmarshalWildCardRequestHeader) { inputReplyBegin(g,unmarshalHeader); return; } { omni_tracedmutex_lock sync(*omniTransportLock); if (!(g->inputFullyBuffered() || g->pd_rdlocked)) { g->rdLock(); } } again: if (!g->pd_currentInputBuffer) { if (g->pd_input) { g->pd_currentInputBuffer = g->pd_input; g->pd_input = g->pd_input->next; g->pd_currentInputBuffer->next = 0; } else { inputNewServerMessage(g); goto again; } } char* hdr = (char*)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start; if (hdr[5] <= 1) { // This is a GIOP 1.0 or 1.1 message, switch to the implementation // and dispatch again. GIOP::Version v; v.major = 1; v.minor = hdr[5]; ((giopStrand &)*g).version = v; g->impl(giopStreamImpl::matchVersion(v)); OMNIORB_ASSERT(g->impl()); g->impl()->inputMessageBegin(g,g->impl()->unmarshalWildCardRequestHeader); return; } g->pd_unmarshal_byte_swap = (((hdr[6] & 0x1) == _OMNIORB_HOST_BYTE_ORDER_) ? 0 : 1 ); g->pd_inb_mkr = (void*)(hdr + 12); g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + g->pd_currentInputBuffer->last); g->inputExpectAnotherFragment(((hdr[6] & 0x2) ? 1 : 0)); g->inputMessageSize(g->pd_currentInputBuffer->size); g->inputFragmentToCome(g->pd_currentInputBuffer->size - (g->pd_currentInputBuffer->last - g->pd_currentInputBuffer->start)); unmarshalHeader(g); if (g->inputMessageSize() > orbParameters::giopMaxMsgSize) { OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnServer, CORBA::COMPLETED_NO); }}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputSkipWholeMessage(giopStream* g) { do { if (g->pd_currentInputBuffer) { giopStream_Buffer::deleteBuffer(g->pd_currentInputBuffer); g->pd_currentInputBuffer = 0; } if (g->inputFragmentToCome()) { if (!g->pd_input) { g->pd_currentInputBuffer = g->inputChunk(g->inputFragmentToCome()); } else { g->pd_currentInputBuffer = g->pd_input; g->pd_input = g->pd_currentInputBuffer->next; g->pd_currentInputBuffer->next = 0; } g->pd_inb_mkr = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start); g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + g->pd_currentInputBuffer->last); g->inputFragmentToCome(g->inputFragmentToCome() - (g->pd_currentInputBuffer->last - g->pd_currentInputBuffer->start)); } else if (g->inputExpectAnotherFragment()) { inputNewFragment(g); } else { break; } } while (1); g->pd_inb_mkr = g->pd_inb_end;}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputMessageEnd(giopStream* g,CORBA::Boolean disgard) { if ( g->pd_strand->state() != giopStrand::DYING ) { while ( g->inputExpectAnotherFragment() && g->inputFragmentToCome() == 0 && g->pd_inb_end == g->pd_inb_mkr ) { // If there are more fragments to come and we do not have any // data left in our buffer, we keep fetching the next // fragment until one of the conditions is false. // This will cater for the case where the remote end is sending // the last fragment(s) with 0 body size to indicate the end of // a message. inputNewFragment(g); } if (!disgard && inputRemaining(g)) { if (omniORB::trace(15)) { omniORB::logger l; l << "Garbage left at the end of input message from " << g->pd_strand->connection->peeraddress() << "\n"; } if (!orbParameters::strictIIOP) { disgard = 1; } else { inputTerminalProtocolError(g, __FILE__, __LINE__); // never reach here. } } if (disgard) inputSkipWholeMessage(g); if (g->pd_currentInputBuffer) { g->releaseInputBuffer(g->pd_currentInputBuffer); g->pd_currentInputBuffer = 0; } } if (g->pd_rdlocked) { omni_tracedmutex_lock sync(*omniTransportLock); g->rdUnLock(); }}////////////////////////////////////////////////////////////////////////voidgiopImpl12::unmarshalReplyHeader(giopStream* g) { char* hdr = (char*)g->pd_currentInputBuffer + g->pd_currentInputBuffer->start; if ((GIOP::MsgType)hdr[7] != GIOP::Reply) { // Unexpected reply. The other end is terribly confused. Drop the // connection and died. inputTerminalProtocolError(g, __FILE__, __LINE__); // Never reach here. } GIOP_C& giop_c = *((GIOP_C*) g); cdrStream& s = *((cdrStream*)g); // We have already verified the request id in the header and the stream // have been setup to go pass it CORBA::ULong v; v <<= s; switch (v) { case GIOP::SYSTEM_EXCEPTION: case GIOP::NO_EXCEPTION: case GIOP::USER_EXCEPTION: case GIOP::LOCATION_FORWARD: case GIOP::LOCATION_FORWARD_PERM: case GIOP::NEEDS_ADDRESSING_MODE: break; default: // Should never receive anything other that the above
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?