corbamessagemediatorimpl.java

来自「JAVA 所有包」· Java 代码 · 共 2,135 行 · 第 1/5 页

JAVA
2,135
字号
		// request ID... but we need the request ID to get the		// IIOPInputStream instance. So we peek at the raw bytes.		header.unmarshalRequestID(dispatchByteBuffer);		if (transportDebug()) {		    dprint(".FRAGMENT 1.2->: id/"			   + header.getRequestId()			   + ": more?: " + header.moreFragmentsToFollow()			   + ": " + header);		}		MessageMediator mediator = null;		InputObject inputObject = null;		if (connection.isServer()) {		    mediator =			connection.serverRequestMapGet(header.getRequestId());		} else {		    mediator = 			connection.clientRequestMapGet(header.getRequestId());		}		if (mediator != null) {		    inputObject = mediator.getInputObject();		}		// See 1.1 comments.		if (inputObject == null) {		    if (transportDebug()) {			dprint(".FRAGMENT 1.2: id/"			       + header.getRequestId()			       + ": ++++DISCARDING++++: "			       + header);		    }                    // need to release dispatchByteBuffer to pool if                    // we are discarding                    releaseByteBufferToPool();		    return;		}		((CDRInputObject)inputObject)		    .getBufferManager().processFragment(                                     dispatchByteBuffer, header);		// REVISIT: but if it is a server don't you have to remove the		// stream from the map?		if (! connection.isServer()) {		    /* REVISIT		     * No need to do anything.		     * Should we mark that last was received?		     if (! header.moreFragmentsToFollow()) {		     // Last fragment.		     }		    */		}	    } finally {		// NOTE: This *must* come after queing the fragment		// when using the selector to ensure fragments stay in order.		setWorkThenReadOrResumeSelect(header);	    }	} catch (Throwable t) {	    if (transportDebug()) 		dprint(".FRAGMENT 1.2: id/"		       + header.getRequestId()		       + ": !!ERROR!!: " 		       + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".FRAGMENT 1.2<-: id/"					 + header.getRequestId() 					 + ": "					 + header);	}    }    public void handleInput(CancelRequestMessage header) throws IOException    {	try {	    try {		messageHeader = header;		setInputObject();		// REVISIT: Move these two to subcontract.		inputObject.unmarshalHeader();		if (transportDebug()) dprint(".CANCEL->: id/" 					     + header.getRequestId() + ": "					     + header.getGIOPVersion() + ": "					     + header);		processCancelRequest(header.getRequestId());                releaseByteBufferToPool();	    } finally {		setWorkThenReadOrResumeSelect(header);	    }	} catch (Throwable t) {	    if (transportDebug()) dprint(".CANCEL: id/"					 + header.getRequestId()					 + ": !!ERROR!!: " 					 + header, t);	    // Mask the exception from thread.;	} finally {	    if (transportDebug()) dprint(".CANCEL<-: id/" 					 + header.getRequestId() + ": "					 + header.getGIOPVersion() + ": "					 + header);	}    }        private void throwNotImplemented()    {	isThreadDone = false;	throwNotImplemented("");    }    private void throwNotImplemented(String msg)    {	throw new RuntimeException("CorbaMessageMediatorImpl: not implemented " + msg);    }    private void dprint(String msg, Throwable t)     {	dprint(msg);	t.printStackTrace(System.out);    }    private void dprint(String msg)     {	ORBUtility.dprint("CorbaMessageMediatorImpl", msg);    }    protected String opAndId(CorbaMessageMediator mediator)    {	return ORBUtility.operationNameAndRequestId(mediator);    }    private boolean transportDebug()    {	return orb.transportDebugFlag;    }    // REVISIT: move this to subcontract (but both client and server need it).    private final void processCancelRequest(int cancelReqId) {        // The GIOP version of CancelRequest does not matter, since        // CancelRequest_1_0 could be sent to cancel a request which        // has a different GIOP version.        /*         * CancelRequest processing logic :         *         *  - find the request with matching requestId         *         *  - call cancelProcessing() in BufferManagerRead [BMR]         *         *  - the hope is that worker thread would call BMR.underflow()         *    to wait for more fragments to come in. When BMR.underflow() is         *    called, if a CancelRequest had already arrived,  	 *    the worker thread would throw ThreadDeath,         *    else the thread would wait to be notified of the         *    arrival of a new fragment or CancelRequest. Upon notification,         *    the woken up thread would check to see if a CancelRequest had         *    arrived and if so throw a ThreadDeath or it will continue to         *    process the received fragment.         *         *  - if all the fragments had been received prior to CancelRequest         *    then the worker thread would never block in BMR.underflow().         *    So, setting the abort flag in BMR has no effect. The request         *    processing will complete normally.         *         *  - in the case where the server has received enough fragments to 	 *    start processing the request and the server sends out 	 *    an early reply. In such a case if the CancelRequest arrives 	 *    after the reply has been sent, it has no effect.         */        if (!connection.isServer()) {	    return; // we do not support bi-directional giop yet, ignore.        }        // Try to get hold of the InputStream buffer.        // In the case of 1.0 requests there is no way to get hold of        // InputStream. Try out the 1.1 and 1.2 cases.        // was the request 1.2 ?	MessageMediator mediator = connection.serverRequestMapGet(cancelReqId);	int requestId ;        if (mediator == null) { 	    // was the request 1.1 ?	    mediator = connection.serverRequest_1_1_Get();            if (mediator == null) {		// XXX log this!                // either the request was 1.0                // or an early reply has already been sent                // or request processing is over                // or its a spurious CancelRequest                return; // do nothing.            }	    requestId = ((CorbaMessageMediator) mediator).getRequestId();            if (requestId != cancelReqId) {                // A spurious 1.1 CancelRequest has been received.		// XXX log this!                return; // do nothing            }	    if (requestId == 0) { // special case		// XXX log this		// this means that		// 1. the 1.1 requests' requestId has not been received		//    i.e., a CancelRequest was received even before the		//    1.1 request was received. The spec disallows this.		// 2. or the 1.1 request has a requestId 0.		//		// It is a little tricky to distinguish these two. So, be		// conservative and do not cancel the request. Downside is that		// 1.1 requests with requestId of 0 will never be cancelled.		return; // do nothing	    }	} else {	    requestId = ((CorbaMessageMediator) mediator).getRequestId();	}	Message msg = ((CorbaMessageMediator)mediator).getRequestHeader();	if (msg.getType() != Message.GIOPRequest) {	    // Any mediator obtained here should only ever be for a GIOP	    // request.	    wrapper.badMessageTypeForCancel() ;		}	// At this point we have a valid message mediator that contains	// a valid requestId.        // at this point we have chosen a request to be cancelled. But we        // do not know if the target object's method has been invoked or not.        // Request input stream being available simply means that the request        // processing is not over yet. simply set the abort flag in the        // BMRS and hope that the worker thread would notice it (this can        // happen only if the request stream is being unmarshalled and the        // target's method has not been invoked yet). This guarantees        // that the requests which have been dispatched to the        // target's method will never be cancelled.        BufferManagerReadStream bufferManager = (BufferManagerReadStream)	    ((CDRInputObject)mediator.getInputObject()).getBufferManager();        bufferManager.cancelProcessing(cancelReqId);    }    ////////////////////////////////////////////////////    //    // spi.protocol.CorbaProtocolHandler    //    public void handleRequest(RequestMessage msg,			      CorbaMessageMediator messageMediator)    {	try {	    beginRequest(messageMediator);	    try {		handleRequestRequest(messageMediator);		if (messageMediator.isOneWay()) {		    return;		}	    } catch (Throwable t) {		if (messageMediator.isOneWay()) {		    return;		}		handleThrowableDuringServerDispatch(                    messageMediator, t, CompletionStatus.COMPLETED_MAYBE);	    }	    sendResponse(messageMediator);        } catch (Throwable t) {	    dispatchError(messageMediator, "RequestMessage", t);	} finally {	    endRequest(messageMediator);	}    }    public void handleRequest(LocateRequestMessage msg,			      CorbaMessageMediator messageMediator)    {	try {	    beginRequest(messageMediator);	    try {		handleLocateRequest(messageMediator);	    } catch (Throwable t) {		handleThrowableDuringServerDispatch(	            messageMediator, t, CompletionStatus.COMPLETED_MAYBE);	    }	    sendResponse(messageMediator);        } catch (Throwable t) {	    dispatchError(messageMediator, "LocateRequestMessage", t);	} finally {	    endRequest(messageMediator);	}    }    private void beginRequest(CorbaMessageMediator messageMediator)    {	ORB orb = (ORB) messageMediator.getBroker();	if (orb.subcontractDebugFlag) {	    dprint(".handleRequest->:");	}	connection.serverRequestProcessingBegins();    }    private void dispatchError(CorbaMessageMediator messageMediator,			       String msg, Throwable t)    {	if (orb.subcontractDebugFlag) {	    dprint(".handleRequest: " + opAndId(messageMediator) 		   + ": !!ERROR!!: "		   + msg,		   t);	}	// REVISIT - this makes hcks sendTwoObjects fail	// messageMediator.getConnection().close();    }    private void sendResponse(CorbaMessageMediator messageMediator)    {	if (orb.subcontractDebugFlag) {	    dprint(".handleRequest: " + opAndId(messageMediator) 		   + ": sending response");	}	// REVISIT - type and location	CDROutputObject outputObject = (CDROutputObject)	    messageMediator.getOutputObject();	if (outputObject != null) {	    // REVISIT - can be null for TRANSIENT below.	    outputObject.finishSendingMessage();	}    }    private void endRequest(CorbaMessageMediator messageMediator)    {	ORB orb = (ORB) messageMediator.getBroker();	if (orb.subcontractDebugFlag) {	    dprint(".handleRequest<-: " + opAndId(messageMediator));	}        // release NIO ByteBuffers to ByteBufferPool        try {            OutputObject outputObj = messageMediator.getOutputObject();            if (outputObj != null) {		outputObj.close();            }            InputObject inputObj = messageMediator.getInputObject();            if (inputObj != null) {		inputObj.close();            }        } catch (IOException ex) {            // Given what close() does, this catch shouldn't ever happen.            // See CDRInput/OutputObject.close() for more info.            // It also won't result in a Corba error if an IOException happens.	    if (orb.subcontractDebugFlag) {                dprint(".endRequest: IOException:" + ex.getMessage(), ex);	    }        } finally {	    ((CorbaConnection)messageMediator.getConnection()).serverRequestProcessingEnds();	}    }    protected void handleRequestRequest(CorbaMessageMediator messageMediator)    {	// Does nothing if already unmarshaled.	((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader();        ORB orb = (ORB)messageMediator.getBroker();	orb.checkShutdownState();	ObjectKey okey = messageMediator.getObjectKey();        if (orb.subcontractDebugFlag) {	    ObjectKeyTemplate oktemp = okey.getTemplate() ;	    dprint( ".handleRequest: " + opAndId(messageMediator)		    + ": dispatching to scid: " + oktemp.getSubcontractId());	}	CorbaServerRequestDispatcher sc = okey.getServerRequestDispatcher(orb);	if (orb.subcontractDebugFlag) {	    dprint(".handleRequest: " + opAndId(messageMediator)		   + ": dispatching to sc: " + sc);	}	if (sc == null) {	    throw wrapper.noServerScInDispatch() ;	}	// NOTE:	// This is necessary so mediator can act as ResponseHandler	// and pass necessary info to response constructors located	// in the subcontract.	// REVISIT - same class right now.	//messageMediator.setProtocolHandler(this);        try {            orb.startingDispatch();	    sc.dispatch(messageMediator);        } finally {            orb.finishedDispatch();        }    }    protected void handleLocateRequest(CorbaMessageMediator messageMediator)    {	ORB orb = (ORB)messageMediator.getBroker();	LocateRequestMessage msg = (LocateRequestMessage)	    messageMediator.getDispatchHeader();	IOR ior = null;	LocateReplyMessage reply = null;	short addrDisp = -1; 	try {	    ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader();	    CorbaServerRequestDispatcher sc = 		msg.getObjectKey().getServerRequestDispatcher( orb ) ;	    if (sc == null) {		return;	    }	    ior = sc.locate(msg.getObjectKey());	    if ( ior == null ) {		reply = MessageBase.createLocateReply(		            orb, msg.getGIOPVersion(),			    msg.getEncodingVersion(), 

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?