giopimpl10.cc
来自「编译工具」· CC 代码 · 共 1,684 行 · 第 1/4 页
CC
1,684 行
if (omniInterceptorP::serverSendException) { omniInterceptors::serverSendException_T::info_T info(giop_s, &ex); omniInterceptorP::visit(info); } // Compute and initialise the message size field { cdrCountingStream cs(g->TCS_C(),g->TCS_W(),12); giop_s.service_contexts() >>= cs; operator>>= ((CORBA::ULong)0,cs); operator>>= ((CORBA::ULong)0,cs); CORBA::ULong(repoid_size) >>= cs; cs.put_octet_array((const CORBA::Octet*) repoid, repoid_size); ex._NP_marshal(cs); outputSetMessageSize(g,cs.total()-12); *((CORBA::ULong*)(hdr + 8)) = cs.total() - 12; } // Service context giop_s.service_contexts() >>= s; // request id giop_s.requestId() >>= s; // reply status CORBA::ULong rc = GIOP::USER_EXCEPTION; rc >>= s; // user exception value CORBA::ULong(repoid_size) >>= s; s.put_octet_array((const CORBA::Octet*) repoid, repoid_size); ex._NP_marshal(s); outputMessageEnd(g);}////////////////////////////////////////////////////////////////////////voidgiopImpl10::sendLocationForwardReply(giopStream* g,CORBA::Object_ptr obj, CORBA::Boolean permanent) { outputNewMessage(g); char* hdr = (char*)g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start; hdr[7] = (char)GIOP::Reply; GIOP_S& giop_s = *(GIOP_S*)g; cdrStream& s = (cdrStream&) *g; // Service context operator>>= ((CORBA::ULong)0,s); // request id giop_s.requestId() >>= s; // reply status CORBA::ULong rc = GIOP::LOCATION_FORWARD; rc >>= s; // Compute and initialise the message size field { CORBA::ULong totalsz = (omni::ptr_arith_t)g->pd_outb_mkr - (omni::ptr_arith_t)hdr; cdrCountingStream cs(g->TCS_C(),g->TCS_W(),totalsz); CORBA::Object::_marshalObjRef(obj,cs); outputSetMessageSize(g,cs.total()-12); *((CORBA::ULong*)(hdr + 8)) = cs.total() - 12; } // object reference CORBA::Object::_marshalObjRef(obj,s); outputMessageEnd(g);}////////////////////////////////////////////////////////////////////////voidgiopImpl10::sendLocateReply(giopStream* g,GIOP::LocateStatusType rc, CORBA::Object_ptr obj,CORBA::SystemException*) { outputNewMessage(g); char* hdr = (char*)g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start; hdr[7] = (char)GIOP::LocateReply; GIOP_S& giop_s = *(GIOP_S*)g; cdrStream& s = (cdrStream&) *g; // request id giop_s.requestId() >>= s; CORBA::Object_ptr extra = CORBA::Object::_nil(); // reply status switch (rc) { case GIOP::UNKNOWN_OBJECT: case GIOP::OBJECT_HERE: break; case GIOP::OBJECT_FORWARD_PERM: rc = GIOP::OBJECT_FORWARD; // falls through case GIOP::OBJECT_FORWARD: extra = obj; break; default: // None of these status types are supported. rc = GIOP::UNKNOWN_OBJECT; break; } operator>>= ((CORBA::ULong)rc ,s); if (!CORBA::is_nil(extra)) { // Compute and initialise the message size field { CORBA::ULong totalsz = (omni::ptr_arith_t)g->pd_outb_mkr - (omni::ptr_arith_t)hdr; cdrCountingStream cs(g->TCS_C(),g->TCS_W(),totalsz); CORBA::Object::_marshalObjRef(extra,cs); outputSetMessageSize(g,cs.total()-12); *((CORBA::ULong*)(hdr + 8)) = cs.total() - 12; } // object reference CORBA::Object::_marshalObjRef(extra,s); } outputMessageEnd(g);}////////////////////////////////////////////////////////////////////////size_tgiopImpl10::outputRemaining(const giopStream* g) { CORBA::ULong total = g->outputMessageSize(); if (!total) { return orbParameters::giopMaxMsgSize - currentOutputPtr(g); } else { return total - currentOutputPtr(g); }}////////////////////////////////////////////////////////////////////////voidgiopImpl10::outputFlush(giopStream* g) { omni::ptr_arith_t outbuf_begin = ((omni::ptr_arith_t) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start); CORBA::ULong fsz = (omni::ptr_arith_t) g->pd_outb_mkr - outbuf_begin; if (!g->outputMessageSize()) { char* hdr = (char*)outbuf_begin; CORBA::ULong msgsz = *((CORBA::ULong*)(hdr + 8)); // the header size including the request/reply header is stored in the // GIOP header's size field. if (hdr[7] == (char) GIOP::Request) { GIOP_C& giop_c = *(GIOP_C*)g; cdrCountingStream cs(g->TCS_C(),g->TCS_W(),msgsz); giop_c.calldescriptor()->marshalArguments(cs); msgsz = cs.total() - 12; } else if (hdr[7] == (char) GIOP::Reply) { GIOP_S& giop_s = *(GIOP_S*)g; cdrCountingStream cs(g->TCS_C(),g->TCS_W(),msgsz); giop_s.calldescriptor()->marshalReturnedValues(cs); msgsz = cs.total() - 12; } else { // Any other message type should never caused this function to be called. if( omniORB::trace(1) ) { omniORB::logger l; l << "Fatal error in sending message to " << g->pd_strand->connection->peeraddress() << ", invariant was violated at " << __FILE__ << ":" << __LINE__ << '\n'; } OMNIORB_ASSERT(0); // never reach here. } *((CORBA::ULong*)(hdr + 8)) = msgsz; outputSetMessageSize(g,msgsz); } if (g->outputFragmentSize()) { g->outputFragmentSize(g->outputFragmentSize()+fsz); } else { g->outputFragmentSize(fsz - 12); } g->pd_currentOutputBuffer->last = g->pd_currentOutputBuffer->start + fsz; g->sendChunk(g->pd_currentOutputBuffer); if (outbuf_begin & 0x7) { // start has previously been changed to non 8-bytes aligned g->pd_currentOutputBuffer->alignStart(omni::ALIGN_8); outbuf_begin = ((omni::ptr_arith_t) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start); } g->pd_outb_mkr = (void*) outbuf_begin; g->pd_outb_end = (void*)((omni::ptr_arith_t)g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->end); g->pd_currentOutputBuffer->last = g->pd_currentOutputBuffer->start;}////////////////////////////////////////////////////////////////////////voidgiopImpl10::getReserveSpace(giopStream* g,omni::alignment_t align,size_t sz) { // The caller has already checked that align == sz, or sz == 0. g->pd_outb_mkr = (void*) omni::align_to((omni::ptr_arith_t)g->pd_outb_mkr, align); if (sz == 0) return; if ((omni::ptr_arith_t)g->pd_outb_mkr < (omni::ptr_arith_t)g->pd_outb_end) { omni::ptr_arith_t newmkr = ((omni::ptr_arith_t)g->pd_outb_mkr + sz); if (newmkr <= (omni::ptr_arith_t)g->pd_outb_end) return; // Should never happen!! OMNIORB_ASSERT(0); } // Reach here only if the buffer has been filled up completely. outputFlush(g);}////////////////////////////////////////////////////////////////////////voidgiopImpl10::copyOutputData(giopStream* g,void* b, size_t sz, omni::alignment_t align) { omni::ptr_arith_t newmkr = omni::align_to((omni::ptr_arith_t)g->pd_outb_mkr, align); OMNIORB_ASSERT(newmkr <= (omni::ptr_arith_t)g->pd_outb_end); if (sz >= giopStream::directSendCutOff) { g->pd_outb_mkr = (void*)newmkr; if ( (omni::ptr_arith_t)g->pd_outb_mkr != ((omni::ptr_arith_t)g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start) ) { outputFlush(g); } // After this vector of bytes is sent, the stream may or may not be // 8 bytes aligned. But our output buffer is now emptied and hence // is 8 bytes aligned. Since we send the whole vector out, we have // to make sure that the next byte will be marshalled at the correct // alignment by adjusting the start of the currentOutputBuffer. g->sendCopyChunk(b,sz); size_t leftover = (newmkr + sz) & 0x7; if (leftover) { g->pd_currentOutputBuffer->start += leftover; g->pd_outb_mkr = (void*) ((omni::ptr_arith_t) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start); } } else { g->pd_outb_mkr = (void*)newmkr; while (sz) { size_t avail = (omni::ptr_arith_t) g->pd_outb_end - (omni::ptr_arith_t) g->pd_outb_mkr; if (avail > sz) avail = sz; memcpy(g->pd_outb_mkr,b,avail); sz -= avail; g->pd_outb_mkr = (void*)((omni::ptr_arith_t) g->pd_outb_mkr + avail); b = (void*)((omni::ptr_arith_t) b + avail); if (g->pd_outb_mkr == g->pd_outb_end) outputFlush(g); } }}////////////////////////////////////////////////////////////////////////CORBA::ULonggiopImpl10::currentOutputPtr(const giopStream* g) { CORBA::ULong fsz = (omni::ptr_arith_t) g->pd_outb_mkr - ((omni::ptr_arith_t) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start); if (g->outputFragmentSize()) { return fsz + g->outputFragmentSize(); } else { // subtract the header that is still in the buffer. return fsz - 12; }}////////////////////////////////////////////////////////////////////////voidgiopImpl10::outputSetMessageSize(giopStream* g,CORBA::ULong msz) { if (msz > orbParameters::giopMaxMsgSize) { char* hdr = (char*)((omni::ptr_arith_t) g->pd_currentOutputBuffer + g->pd_currentOutputBuffer->start); switch ((GIOP::MsgType)hdr[7]) { case GIOP::Request: case GIOP::LocateRequest: { // We have detected a limit error on the client side, since // we have not sent any part of the message yet, we can // safely relinquish this giopStream so that another request // can proceed. There is no need to close the connection. ((GIOP_C*)g)->state(IOP_C::Idle); omni_tracedmutex_lock sync(*omniTransportLock); g->wrUnLock(); OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnClient, (CORBA::CompletionStatus)g->completion()); } break; case GIOP::Reply: case GIOP::LocateReply: { // We have detected a limit error on the server side OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnServer, (CORBA::CompletionStatus)g->completion()); } break; default: OMNIORB_ASSERT(0); } } g->outputMessageSize(msz);}////////////////////////////////////////////////////////////////////////static giopStreamImpl* giop_1_0_singleton = 0;class omni_giopImpl10_initialiser : public omniInitialiser {public: void attach() { if (!giop_1_0_singleton) { GIOP::Version ver = { 1, 0}; giopStreamImpl* p; giop_1_0_singleton = p = new giopStreamImpl(ver); // Shared by the client and server side // Process message header p->outputMessageBegin = giopImpl10::outputMessageBegin; p->outputMessageEnd = giopImpl10::outputMessageEnd; p->inputMessageBegin = giopImpl10::inputMessageBegin; p->inputMessageEnd = giopImpl10::inputMessageEnd; p->sendMsgErrorMessage = giopImpl10::sendMsgErrorMessage; // Client side // Process message header p->marshalRequestHeader = giopImpl10::marshalRequestHeader; p->sendLocateRequest = giopImpl10::sendLocateRequest; p->unmarshalReplyHeader = giopImpl10::unmarshalReplyHeader; p->unmarshalLocateReply = giopImpl10::unmarshalLocateReply; // Server side // Process message header p->unmarshalWildCardRequestHeader = giopImpl10::unmarshalWildCardRequestHeader; p->unmarshalRequestHeader = giopImpl10::unmarshalRequestHeader; p->unmarshalLocateRequest = giopImpl10::unmarshalLocateRequest; p->marshalReplyHeader = giopImpl10::marshalReplyHeader; p->sendSystemException = giopImpl10::sendSystemException; p->sendUserException = giopImpl10::sendUserException; p->sendLocationForwardReply = giopImpl10::sendLocationForwardReply; p->sendLocateReply = giopImpl10::sendLocateReply; // Shared by the client and the server side // Process message body p->inputRemaining = giopImpl10::inputRemaining; p->getInputData = giopImpl10::getInputData; p->skipInputData = giopImpl10::skipInputData; p->copyInputData = giopImpl10::copyInputData; p->outputRemaining = giopImpl10::outputRemaining; p->getReserveSpace = giopImpl10::getReserveSpace; p->copyOutputData = giopImpl10::copyOutputData; p->currentInputPtr = giopImpl10::currentInputPtr; p->currentOutputPtr = giopImpl10::currentOutputPtr; giopStreamImpl::registerImpl(giop_1_0_singleton); } } void detach() { if (giop_1_0_singleton) { delete giop_1_0_singleton; giop_1_0_singleton = 0; } }};static omni_giopImpl10_initialiser initialiser;omniInitialiser& omni_giopImpl10_initialiser_ = initialiser;OMNI_NAMESPACE_END(omni)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?