giopimpl12.cc

来自「编译工具」· CC 代码 · 共 2,182 行 · 第 1/5 页

CC
2,182
字号
  omniTransportLock->unlock();  if (matched_target) {    if (matched_target == g) {      OMNIORB_ASSERT(g->pd_input == 0);      if (g->pd_currentInputBuffer) {	g->releaseInputBuffer(g->pd_currentInputBuffer);	g->pd_currentInputBuffer = 0;      }      g->pd_input = b;      return;    }    OMNIORB_ASSERT(matched_target->pd_currentInputBuffer == 0);    giopStream_Buffer** pp = &matched_target->pd_input;    while (*pp) {      pp = &((*pp)->next);    }    *pp = b;    CORBA::ULong fetchsz = b->size - (b->last - b->start);    while (fetchsz) {      // fetch the rest of the message;      giopStream_Buffer* p = g->inputChunk(fetchsz);      pp = &((*pp)->next);      *pp = p;      fetchsz -= (p->last - p->start);    }          CORBA::Boolean isfull = ((hdr[6] & 0x2) ? 0 : 1);    if (mtype == GIOP::CancelRequest) isfull = 1;    if (isfull) {      omni_tracedmutex_lock sync(*omniTransportLock);      matched_target->inputFullyBuffered(isfull);      if (!matched_target_is_client) {	((GIOP_S*)matched_target)->state(IOP_S::InputFullyBuffered);	omniORB::logs(40, "Changed GIOP_S to InputFullyBuffered");	if (!g->pd_strand->isClient()) {	  g->pd_strand->server->notifyCallFullyBuffered(g->pd_strand->connection);	}      }      giopStream::wakeUpRdLock(g->pd_strand);    }    return;  }  // reach here if we have no match.  {    CORBA::ULong fetchsz = b->size - (b->last - b->start);    giopStream_Buffer::deleteBuffer(b);    while (fetchsz) {      // fetch the rest of the message;      b = g->inputChunk(fetchsz);      fetchsz -= (b->last - b->start);      giopStream_Buffer::deleteBuffer(b);    }  }}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputNewServerMessage(giopStream* g) {  OMNIORB_ASSERT(g->pd_currentInputBuffer == 0);  g->pd_currentInputBuffer = g->inputMessage();  unsigned char* hdr = (unsigned char*)g->pd_currentInputBuffer +                                        g->pd_currentInputBuffer->start;  if (hdr[4] != 1 || hdr[5] > 2 || hdr[7] > (unsigned char) GIOP::Fragment) {    inputTerminalProtocolError(g, __FILE__, __LINE__);    // never reach here  }  switch ((GIOP::MsgType)hdr[7]) {  case GIOP::Request:  case GIOP::LocateRequest:  case GIOP::MessageError:  case GIOP::CloseConnection:    return;  case GIOP::Reply:  case GIOP::LocateReply:    if (g->pd_strand->biDir) {      break;    }    else {      inputTerminalProtocolError(g, __FILE__, __LINE__);      // Never reach here.    }  case GIOP::Fragment:  case GIOP::CancelRequest:    break;  }  // reach here if the message is destined to some other call in progress.  giopStream_Buffer* p = g->pd_currentInputBuffer;  g->pd_currentInputBuffer = 0;  inputQueueMessage(g,p);}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputNewFragment(giopStream* g) {  if (g->pd_currentInputBuffer) {    g->releaseInputBuffer(g->pd_currentInputBuffer);    g->pd_currentInputBuffer = 0;  } again:  if (!g->pd_input) {    giopStream_Buffer* p = g->inputMessage();    inputQueueMessage(g,p);    goto again;  }  else {    g->pd_currentInputBuffer = g->pd_input;    g->pd_input = g->pd_currentInputBuffer->next;    g->pd_currentInputBuffer->next = 0;  }  char* hdr = (char*)g->pd_currentInputBuffer +                      g->pd_currentInputBuffer->start;  if (hdr[7] == GIOP::CancelRequest) {    if (g->pd_strand->biDir || !g->pd_strand->isClient()) {      throw GIOP_S::terminateProcessing();    }    else {      inputTerminalProtocolError(g, __FILE__, __LINE__);      // never reach here.    }  }  CORBA::Boolean bswap = (((hdr[6] & 0x1) == _OMNIORB_HOST_BYTE_ORDER_)			  ? 0 : 1 );  if (bswap != g->pd_unmarshal_byte_swap) {    inputTerminalProtocolError(g, __FILE__, __LINE__);    // never reach here  }  g->pd_inb_mkr = (void*)(hdr + 16);  g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + 			  g->pd_currentInputBuffer->last);  g->inputExpectAnotherFragment(((hdr[6] & 0x2) ? 1 : 0));  g->inputMessageSize(g->inputMessageSize() + 		      g->pd_currentInputBuffer->size - 16);  g->inputFragmentToCome(g->pd_currentInputBuffer->size - 			 (g->pd_currentInputBuffer->last -			  g->pd_currentInputBuffer->start));}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputReplyBegin(giopStream* g, 			    void (*unmarshalHeader)(giopStream*)) {  {    omni_tracedmutex_lock sync(*omniTransportLock);    if (!g->pd_strand->biDir) {      while (!(g->inputFullyBuffered() || g->pd_rdlocked)) {	if (!g->rdLockNonBlocking()) {	  g->sleepOnRdLock();	}      }    }    else {      // If the strand is used for bidirectional GIOP, we      // let the server thread to read the incoming message and to      // demultiplex any reply back to us. Therefore, we do not      // try to acquire the read lock but just sleep on the read      // lock instead.      while (!(g->pd_strand->state() == giopStrand::DYING ||	       g->inputFullyBuffered() ) ) {	OMNIORB_ASSERT(g->pd_rdlocked == 0);	g->sleepOnRdLockAlways();      }      if (g->pd_strand->state() == giopStrand::DYING) {	CORBA::ULong minor;	CORBA::Boolean retry;	g->notifyCommFailure(1,minor,retry);	CORBA::CompletionStatus status;	if (g->pd_strand->orderly_closed) {	  status = CORBA::COMPLETED_NO;	}	else {	  status = (CORBA::CompletionStatus)g->completion();	}	giopStream::CommFailure::_raise(minor,					status,					retry,					__FILE__,__LINE__);	// never reaches here.      }    }  }  if (!g->pd_currentInputBuffer) {  again:    if (!g->pd_input) {      giopStream_Buffer* p = g->inputMessage();      inputQueueMessage(g,p);      goto again;    }    else {      g->pd_currentInputBuffer = g->pd_input;      g->pd_input = g->pd_input->next;      g->pd_currentInputBuffer->next = 0;    }  }  char* hdr = (char*)g->pd_currentInputBuffer +                      g->pd_currentInputBuffer->start;  g->pd_unmarshal_byte_swap = (((hdr[6] & 0x1) == _OMNIORB_HOST_BYTE_ORDER_)			       ? 0 : 1 );  g->pd_inb_mkr = (void*)(hdr + 16);  g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + 			  g->pd_currentInputBuffer->last);  g->inputExpectAnotherFragment(((hdr[6] & 0x2) ? 1 : 0));  g->inputMessageSize(g->pd_currentInputBuffer->size);  g->inputFragmentToCome(g->pd_currentInputBuffer->size - 			 (g->pd_currentInputBuffer->last -			  g->pd_currentInputBuffer->start));  unmarshalHeader(g);  if (g->inputMessageSize() > orbParameters::giopMaxMsgSize) {    OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnClient,		  CORBA::COMPLETED_YES);  }}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputMessageBegin(giopStream* g,			      void (*unmarshalHeader)(giopStream*)) {  if (unmarshalHeader != unmarshalWildCardRequestHeader) {    inputReplyBegin(g,unmarshalHeader);    return;  }  {    omni_tracedmutex_lock sync(*omniTransportLock);    if (!(g->inputFullyBuffered() || g->pd_rdlocked)) {      g->rdLock();    }  } again:  if (!g->pd_currentInputBuffer) {    if (g->pd_input) {      g->pd_currentInputBuffer = g->pd_input;      g->pd_input = g->pd_input->next;      g->pd_currentInputBuffer->next = 0;    }    else {      inputNewServerMessage(g);      goto again;    }  }  char* hdr = (char*)g->pd_currentInputBuffer +                      g->pd_currentInputBuffer->start;  if (hdr[5] <= 1) {    // This is a GIOP 1.0 or 1.1 message, switch to the implementation    // and dispatch again.    GIOP::Version v;    v.major = 1;    v.minor = hdr[5];    ((giopStrand &)*g).version = v;    g->impl(giopStreamImpl::matchVersion(v));    OMNIORB_ASSERT(g->impl());    g->impl()->inputMessageBegin(g,g->impl()->unmarshalWildCardRequestHeader);    return;  }  g->pd_unmarshal_byte_swap = (((hdr[6] & 0x1) == _OMNIORB_HOST_BYTE_ORDER_)			       ? 0 : 1 );  g->pd_inb_mkr = (void*)(hdr + 12);  g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + 			  g->pd_currentInputBuffer->last);  g->inputExpectAnotherFragment(((hdr[6] & 0x2) ? 1 : 0));  g->inputMessageSize(g->pd_currentInputBuffer->size);  g->inputFragmentToCome(g->pd_currentInputBuffer->size - 			 (g->pd_currentInputBuffer->last -			  g->pd_currentInputBuffer->start));  unmarshalHeader(g);  if (g->inputMessageSize() > orbParameters::giopMaxMsgSize) {    OMNIORB_THROW(MARSHAL,MARSHAL_MessageSizeExceedLimitOnServer,		  CORBA::COMPLETED_NO);  }}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputSkipWholeMessage(giopStream* g) {  do {    if (g->pd_currentInputBuffer) {      giopStream_Buffer::deleteBuffer(g->pd_currentInputBuffer);      g->pd_currentInputBuffer = 0;    }    if (g->inputFragmentToCome()) {      if (!g->pd_input) {	g->pd_currentInputBuffer = g->inputChunk(g->inputFragmentToCome());      }      else {	g->pd_currentInputBuffer = g->pd_input;	g->pd_input = g->pd_currentInputBuffer->next;	g->pd_currentInputBuffer->next = 0;      }      g->pd_inb_mkr = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + 			      g->pd_currentInputBuffer->start);      g->pd_inb_end = (void*)((omni::ptr_arith_t)g->pd_currentInputBuffer + 			      g->pd_currentInputBuffer->last);      g->inputFragmentToCome(g->inputFragmentToCome() - 			     (g->pd_currentInputBuffer->last -			      g->pd_currentInputBuffer->start));    }    else if (g->inputExpectAnotherFragment()) {      inputNewFragment(g);    }    else {      break;    }  } while (1);  g->pd_inb_mkr = g->pd_inb_end;}////////////////////////////////////////////////////////////////////////voidgiopImpl12::inputMessageEnd(giopStream* g,CORBA::Boolean disgard) {  if ( g->pd_strand->state() != giopStrand::DYING ) {    while ( g->inputExpectAnotherFragment() &&	    g->inputFragmentToCome() == 0   && 	    g->pd_inb_end == g->pd_inb_mkr     ) {      // If there are more fragments to come and we do not have any      // data left in our buffer, we keep fetching the next      // fragment until one of the conditions is false.      // This will cater for the case where the remote end is sending      // the last fragment(s) with 0 body size to indicate the end of      // a message.      inputNewFragment(g);    }    if (!disgard && inputRemaining(g)) {      if (omniORB::trace(15)) {	omniORB::logger l;	l << "Garbage left at the end of input message from "	  << g->pd_strand->connection->peeraddress() << "\n";      }      if (!orbParameters::strictIIOP) {	disgard = 1;      }      else {	inputTerminalProtocolError(g, __FILE__, __LINE__);	// never reach here.      }    }    if (disgard)      inputSkipWholeMessage(g);    if (g->pd_currentInputBuffer) {      g->releaseInputBuffer(g->pd_currentInputBuffer);      g->pd_currentInputBuffer = 0;    }  }  if (g->pd_rdlocked) {    omni_tracedmutex_lock sync(*omniTransportLock);    g->rdUnLock();  }}////////////////////////////////////////////////////////////////////////voidgiopImpl12::unmarshalReplyHeader(giopStream* g) {  char* hdr = (char*)g->pd_currentInputBuffer +                      g->pd_currentInputBuffer->start;  if ((GIOP::MsgType)hdr[7] != GIOP::Reply) {    // Unexpected reply. The other end is terribly confused. Drop the    // connection and died.    inputTerminalProtocolError(g, __FILE__, __LINE__);    // Never reach here.  }  GIOP_C& giop_c = *((GIOP_C*) g);  cdrStream& s = *((cdrStream*)g);  // We have already verified the request id in the header and the stream  // have been setup to go pass it  CORBA::ULong v;  v <<= s;  switch (v) {  case GIOP::SYSTEM_EXCEPTION:  case GIOP::NO_EXCEPTION:  case GIOP::USER_EXCEPTION:  case GIOP::LOCATION_FORWARD:  case GIOP::LOCATION_FORWARD_PERM:  case GIOP::NEEDS_ADDRESSING_MODE:    break;  default:    // Should never receive anything other that the above

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?