corbamessagemediatorimpl.java

来自「JAVA 所有包」· Java 代码 · 共 2,135 行 · 第 1/5 页

JAVA
2,135
字号
		messageHeader = requestHeader = (RequestMessage) header;		setInputObject();	    } finally {		setWorkThenPoolOrResumeSelect(header);	    }	    getProtocolHandler().handleRequest(header, this);	} catch (Throwable t) {	    if (transportDebug())		dprint(".REQUEST 1.0: !!ERROR!!: " + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".REQUEST 1.0<-: " + header);	}    }        public void handleInput(RequestMessage_1_1 header) throws IOException    {	try {	    if (transportDebug()) dprint(".REQUEST 1.1->: " + header);	    try {		messageHeader = requestHeader = (RequestMessage) header;		setInputObject();		connection.serverRequest_1_1_Put(this);	    } finally {		setWorkThenPoolOrResumeSelect(header);	    }	    getProtocolHandler().handleRequest(header, this);	} catch (Throwable t) {	    if (transportDebug())		dprint(".REQUEST 1.1: !!ERROR!!: " + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".REQUEST 1.1<-: " + header);	}    }    // REVISIT: this is identical to 1_0 except for fragment part.    public void handleInput(RequestMessage_1_2 header) throws IOException    {	try {	    try {		messageHeader = requestHeader = (RequestMessage) header;		header.unmarshalRequestID(dispatchByteBuffer);		setInputObject();		if (transportDebug()) dprint(".REQUEST 1.2->: id/" 					     + header.getRequestId() 					     + ": "					     + header);	    		// NOTE: in the old code this used to be done conditionally:		// if (header.moreFragmentsToFollow()).		// Now we always put it in. We take it out when		// the response is done.		// This must happen now so if a header is fragmented the stream		// may be found.		connection.serverRequestMapPut(header.getRequestId(), this);	    } finally {		// Leader/Follower.		// Note: This *MUST* come after putting stream in above map		// since the header may be fragmented and you do not want to		// start reading again until the map above is set.		setWorkThenPoolOrResumeSelect(header);	    }	    //inputObject.unmarshalHeader(); // done in subcontract.	    getProtocolHandler().handleRequest(header, this);	} catch (Throwable t) {	    if (transportDebug()) dprint(".REQUEST 1.2: id/"					 + header.getRequestId()					 + ": !!ERROR!!: "					 + header,					 t);	    // Mask the exception from thread.;	} finally {	    connection.serverRequestMapRemove(header.getRequestId());	    if (transportDebug()) dprint(".REQUEST 1.2<-: id/" 					 + header.getRequestId() 					 + ": "					 + header);	}    }    public void handleInput(ReplyMessage_1_0 header) throws IOException    {	try {	    try {		if (transportDebug()) dprint(".REPLY 1.0->: " + header);		messageHeader = replyHeader = (ReplyMessage) header;		setInputObject();		// REVISIT: this should be done by waiting thread.		inputObject.unmarshalHeader();		signalResponseReceived();	    } finally{		setWorkThenReadOrResumeSelect(header);	    }	} catch (Throwable t) {	    if (transportDebug())dprint(".REPLY 1.0: !!ERROR!!: " + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".REPLY 1.0<-: " + header);	}    }        public void handleInput(ReplyMessage_1_1 header) throws IOException    {	try {	    if (transportDebug()) dprint(".REPLY 1.1->: " + header);	    messageHeader = replyHeader = (ReplyMessage) header;	    setInputObject();	    if (header.moreFragmentsToFollow()) {		// More fragments are coming to complete this reply, so keep		// a reference to the InputStream so we can add the fragments		connection.clientReply_1_1_Put(this);            		// In 1.1, we can't assume that we have the request ID in the		// first fragment.  Thus, another thread is used 		// to be the reader while this thread unmarshals		// the extended header and wakes up the client thread.		setWorkThenPoolOrResumeSelect(header);		// REVISIT - error handling.		// This must be done now.		inputObject.unmarshalHeader();		signalResponseReceived();	    } else {		// Not fragmented, therefore we know the request		// ID is here.  Thus, we can unmarshal the extended header		// and wake up the client thread without using a third		// thread as above.		// REVISIT - error handling during unmarshal.		// This must be done now to get the request id.		inputObject.unmarshalHeader();		signalResponseReceived();		setWorkThenReadOrResumeSelect(header);	    }	} catch (Throwable t) {	    if (transportDebug()) dprint(".REPLY 1.1: !!ERROR!!: " + header);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".REPLY 1.1<-: " + header);	}    }    public void handleInput(ReplyMessage_1_2 header) throws IOException    {	try {	    try {		messageHeader = replyHeader = (ReplyMessage) header;		// We know that the request ID is in the first fragment		header.unmarshalRequestID(dispatchByteBuffer);		if (transportDebug()) {		    dprint(".REPLY 1.2->: id/" 			   + + header.getRequestId()			   + ": more?: " + header.moreFragmentsToFollow()			   + ": " + header);		}				setInputObject();		signalResponseReceived();	    } finally {		setWorkThenReadOrResumeSelect(header);	    }	} catch (Throwable t) {	    if (transportDebug()) dprint(".REPLY 1.2: id/"					 + header.getRequestId()					 + ": !!ERROR!!: " 					 + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".REPLY 1.2<-: id/"					 + header.getRequestId() 					 + ": "					 + header);	}    }    public void handleInput(LocateRequestMessage_1_0 header) throws IOException    {	try {	    if (transportDebug())		dprint(".LOCATE_REQUEST 1.0->: " + header);	    try {		messageHeader = header;		setInputObject();	    } finally {		setWorkThenPoolOrResumeSelect(header);	    }	    getProtocolHandler().handleRequest(header, this);	} catch (Throwable t) {	    if (transportDebug()) 		dprint(".LOCATE_REQUEST 1.0: !!ERROR!!: " + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) 		dprint(".LOCATE_REQUEST 1.0<-: " + header);	}    }    public void handleInput(LocateRequestMessage_1_1 header) throws IOException    {	try {	    if (transportDebug()) 		dprint(".LOCATE_REQUEST 1.1->: " + header);	    try {		messageHeader = header;		setInputObject();	    } finally {		setWorkThenPoolOrResumeSelect(header);	    }	    getProtocolHandler().handleRequest(header, this);	} catch (Throwable t) {	    if (transportDebug()) 		dprint(".LOCATE_REQUEST 1.1: !!ERROR!!: " + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) 		dprint(".LOCATE_REQUEST 1.1<-:" + header);	}    }    public void handleInput(LocateRequestMessage_1_2 header) throws IOException    {	try {	    try {		messageHeader = header;		header.unmarshalRequestID(dispatchByteBuffer);		setInputObject();		if (transportDebug()) 		    dprint(".LOCATE_REQUEST 1.2->: id/"			   + header.getRequestId() 			   + ": "			   + header);		if (header.moreFragmentsToFollow()) {		    connection.serverRequestMapPut(header.getRequestId(),this);		}	    } finally {		setWorkThenPoolOrResumeSelect(header);	    }	    getProtocolHandler().handleRequest(header, this);	} catch (Throwable t) {	    if (transportDebug()) 		dprint(".LOCATE_REQUEST 1.2: id/"		       + header.getRequestId()		       + ": !!ERROR!!: " 		       + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) 		dprint(".LOCATE_REQUEST 1.2<-: id/"		       + header.getRequestId() 		       + ": "		       + header);	}    }    public void handleInput(LocateReplyMessage_1_0 header) throws IOException    {	try {	    if (transportDebug()) 		dprint(".LOCATE_REPLY 1.0->:" + header);	    try {		messageHeader = header;		setInputObject();		inputObject.unmarshalHeader(); // REVISIT Put in subcontract.		signalResponseReceived();	    } finally {		setWorkThenReadOrResumeSelect(header);	    }	} catch (Throwable t) {	    if (transportDebug()) 		dprint(".LOCATE_REPLY 1.0: !!ERROR!!: " + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) 		dprint(".LOCATE_REPLY 1.0<-: " + header);	}    }    public void handleInput(LocateReplyMessage_1_1 header) throws IOException    {	try {	    if (transportDebug()) dprint(".LOCATE_REPLY 1.1->: " + header);	    try {		messageHeader = header;		setInputObject();		// Fragmented LocateReplies are not allowed in 1.1.		inputObject.unmarshalHeader();		signalResponseReceived();	    } finally {		setWorkThenReadOrResumeSelect(header);	    }	} catch (Throwable t) {	    if (transportDebug()) 		dprint(".LOCATE_REPLY 1.1: !!ERROR!!: " + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".LOCATE_REPLY 1.1<-: " + header);	}    }    public void handleInput(LocateReplyMessage_1_2 header) throws IOException    {	try {	    try {		messageHeader = header;		// No need to put in client reply map - already there.		header.unmarshalRequestID(dispatchByteBuffer);		setInputObject();		if (transportDebug()) dprint(".LOCATE_REPLY 1.2->: id/"					     + header.getRequestId() 					     + ": "					     + header);		signalResponseReceived();	    } finally {		setWorkThenPoolOrResumeSelect(header); // REVISIT	    }	} catch (Throwable t) {	    if (transportDebug()) 		dprint(".LOCATE_REPLY 1.2: id/"		       + header.getRequestId()		       + ": !!ERROR!!: " 		       + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".LOCATE_REPLY 1.2<-: id/"					 + header.getRequestId() 					 + ": "					 + header);	}    }    public void handleInput(FragmentMessage_1_1 header) throws IOException    {	try {	    if (transportDebug()) {		dprint(".FRAGMENT 1.1->: "		       + "more?: " + header.moreFragmentsToFollow()		       + ": " + header);	    }	    try {		messageHeader = header;		MessageMediator mediator = null;		CDRInputObject inputObject = null;		if (connection.isServer()) {		    mediator = connection.serverRequest_1_1_Get();		} else {		    mediator = connection.clientReply_1_1_Get();		}		if (mediator != null) {		    inputObject = (CDRInputObject) mediator.getInputObject();		}		// If no input stream available, then discard the fragment.		// This can happen:		// 1. if a fragment message is received prior to receiving		//    the original request/reply message. Very unlikely.		// 2. if a fragment message is received after the		//    reply has been sent (early replies)		// Note: In the case of early replies, the fragments received		// during the request processing (which are never unmarshaled),		// will eventually be discarded by the GC.		if (inputObject == null) {		    if (transportDebug()) 			dprint(".FRAGMENT 1.1: ++++DISCARDING++++: " + header);                    // need to release dispatchByteBuffer to pool if                    // we are discarding                    releaseByteBufferToPool();		    return;		}		inputObject.getBufferManager()		    .processFragment(dispatchByteBuffer, header);		if (! header.moreFragmentsToFollow()) {		    if (connection.isServer()) {			connection.serverRequest_1_1_Remove();		    } else {			connection.clientReply_1_1_Remove();		    }		}	    } finally {		// NOTE: This *must* come after queing the fragment		// when using the selector to ensure fragments stay in order.		setWorkThenReadOrResumeSelect(header);	    }	} catch (Throwable t) {	    if (transportDebug()) 		dprint(".FRAGMENT 1.1: !!ERROR!!: " + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".FRAGMENT 1.1<-: " + header);	}    }    public void handleInput(FragmentMessage_1_2 header) throws IOException    {	try {	    try {		messageHeader = header;		// Note:  We know it's a 1.2 fragment, we have the data, but		// we need the IIOPInputStream instance to unmarshal the

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?