corbamessagemediatorimpl.java
来自「JAVA 所有包」· Java 代码 · 共 2,135 行 · 第 1/5 页
JAVA
2,135 行
// request ID... but we need the request ID to get the // IIOPInputStream instance. So we peek at the raw bytes. header.unmarshalRequestID(dispatchByteBuffer); if (transportDebug()) { dprint(".FRAGMENT 1.2->: id/" + header.getRequestId() + ": more?: " + header.moreFragmentsToFollow() + ": " + header); } MessageMediator mediator = null; InputObject inputObject = null; if (connection.isServer()) { mediator = connection.serverRequestMapGet(header.getRequestId()); } else { mediator = connection.clientRequestMapGet(header.getRequestId()); } if (mediator != null) { inputObject = mediator.getInputObject(); } // See 1.1 comments. if (inputObject == null) { if (transportDebug()) { dprint(".FRAGMENT 1.2: id/" + header.getRequestId() + ": ++++DISCARDING++++: " + header); } // need to release dispatchByteBuffer to pool if // we are discarding releaseByteBufferToPool(); return; } ((CDRInputObject)inputObject) .getBufferManager().processFragment( dispatchByteBuffer, header); // REVISIT: but if it is a server don't you have to remove the // stream from the map? if (! connection.isServer()) { /* REVISIT * No need to do anything. * Should we mark that last was received? if (! header.moreFragmentsToFollow()) { // Last fragment. } */ } } finally { // NOTE: This *must* come after queing the fragment // when using the selector to ensure fragments stay in order. setWorkThenReadOrResumeSelect(header); } } catch (Throwable t) { if (transportDebug()) dprint(".FRAGMENT 1.2: id/" + header.getRequestId() + ": !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".FRAGMENT 1.2<-: id/" + header.getRequestId() + ": " + header); } } public void handleInput(CancelRequestMessage header) throws IOException { try { try { messageHeader = header; setInputObject(); // REVISIT: Move these two to subcontract. inputObject.unmarshalHeader(); if (transportDebug()) dprint(".CANCEL->: id/" + header.getRequestId() + ": " + header.getGIOPVersion() + ": " + header); processCancelRequest(header.getRequestId()); releaseByteBufferToPool(); } finally { setWorkThenReadOrResumeSelect(header); } } catch (Throwable t) { if (transportDebug()) dprint(".CANCEL: id/" + header.getRequestId() + ": !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".CANCEL<-: id/" + header.getRequestId() + ": " + header.getGIOPVersion() + ": " + header); } } private void throwNotImplemented() { isThreadDone = false; throwNotImplemented(""); } private void throwNotImplemented(String msg) { throw new RuntimeException("CorbaMessageMediatorImpl: not implemented " + msg); } private void dprint(String msg, Throwable t) { dprint(msg); t.printStackTrace(System.out); } private void dprint(String msg) { ORBUtility.dprint("CorbaMessageMediatorImpl", msg); } protected String opAndId(CorbaMessageMediator mediator) { return ORBUtility.operationNameAndRequestId(mediator); } private boolean transportDebug() { return orb.transportDebugFlag; } // REVISIT: move this to subcontract (but both client and server need it). private final void processCancelRequest(int cancelReqId) { // The GIOP version of CancelRequest does not matter, since // CancelRequest_1_0 could be sent to cancel a request which // has a different GIOP version. /* * CancelRequest processing logic : * * - find the request with matching requestId * * - call cancelProcessing() in BufferManagerRead [BMR] * * - the hope is that worker thread would call BMR.underflow() * to wait for more fragments to come in. When BMR.underflow() is * called, if a CancelRequest had already arrived, * the worker thread would throw ThreadDeath, * else the thread would wait to be notified of the * arrival of a new fragment or CancelRequest. Upon notification, * the woken up thread would check to see if a CancelRequest had * arrived and if so throw a ThreadDeath or it will continue to * process the received fragment. * * - if all the fragments had been received prior to CancelRequest * then the worker thread would never block in BMR.underflow(). * So, setting the abort flag in BMR has no effect. The request * processing will complete normally. * * - in the case where the server has received enough fragments to * start processing the request and the server sends out * an early reply. In such a case if the CancelRequest arrives * after the reply has been sent, it has no effect. */ if (!connection.isServer()) { return; // we do not support bi-directional giop yet, ignore. } // Try to get hold of the InputStream buffer. // In the case of 1.0 requests there is no way to get hold of // InputStream. Try out the 1.1 and 1.2 cases. // was the request 1.2 ? MessageMediator mediator = connection.serverRequestMapGet(cancelReqId); int requestId ; if (mediator == null) { // was the request 1.1 ? mediator = connection.serverRequest_1_1_Get(); if (mediator == null) { // XXX log this! // either the request was 1.0 // or an early reply has already been sent // or request processing is over // or its a spurious CancelRequest return; // do nothing. } requestId = ((CorbaMessageMediator) mediator).getRequestId(); if (requestId != cancelReqId) { // A spurious 1.1 CancelRequest has been received. // XXX log this! return; // do nothing } if (requestId == 0) { // special case // XXX log this // this means that // 1. the 1.1 requests' requestId has not been received // i.e., a CancelRequest was received even before the // 1.1 request was received. The spec disallows this. // 2. or the 1.1 request has a requestId 0. // // It is a little tricky to distinguish these two. So, be // conservative and do not cancel the request. Downside is that // 1.1 requests with requestId of 0 will never be cancelled. return; // do nothing } } else { requestId = ((CorbaMessageMediator) mediator).getRequestId(); } Message msg = ((CorbaMessageMediator)mediator).getRequestHeader(); if (msg.getType() != Message.GIOPRequest) { // Any mediator obtained here should only ever be for a GIOP // request. wrapper.badMessageTypeForCancel() ; } // At this point we have a valid message mediator that contains // a valid requestId. // at this point we have chosen a request to be cancelled. But we // do not know if the target object's method has been invoked or not. // Request input stream being available simply means that the request // processing is not over yet. simply set the abort flag in the // BMRS and hope that the worker thread would notice it (this can // happen only if the request stream is being unmarshalled and the // target's method has not been invoked yet). This guarantees // that the requests which have been dispatched to the // target's method will never be cancelled. BufferManagerReadStream bufferManager = (BufferManagerReadStream) ((CDRInputObject)mediator.getInputObject()).getBufferManager(); bufferManager.cancelProcessing(cancelReqId); } //////////////////////////////////////////////////// // // spi.protocol.CorbaProtocolHandler // public void handleRequest(RequestMessage msg, CorbaMessageMediator messageMediator) { try { beginRequest(messageMediator); try { handleRequestRequest(messageMediator); if (messageMediator.isOneWay()) { return; } } catch (Throwable t) { if (messageMediator.isOneWay()) { return; } handleThrowableDuringServerDispatch( messageMediator, t, CompletionStatus.COMPLETED_MAYBE); } sendResponse(messageMediator); } catch (Throwable t) { dispatchError(messageMediator, "RequestMessage", t); } finally { endRequest(messageMediator); } } public void handleRequest(LocateRequestMessage msg, CorbaMessageMediator messageMediator) { try { beginRequest(messageMediator); try { handleLocateRequest(messageMediator); } catch (Throwable t) { handleThrowableDuringServerDispatch( messageMediator, t, CompletionStatus.COMPLETED_MAYBE); } sendResponse(messageMediator); } catch (Throwable t) { dispatchError(messageMediator, "LocateRequestMessage", t); } finally { endRequest(messageMediator); } } private void beginRequest(CorbaMessageMediator messageMediator) { ORB orb = (ORB) messageMediator.getBroker(); if (orb.subcontractDebugFlag) { dprint(".handleRequest->:"); } connection.serverRequestProcessingBegins(); } private void dispatchError(CorbaMessageMediator messageMediator, String msg, Throwable t) { if (orb.subcontractDebugFlag) { dprint(".handleRequest: " + opAndId(messageMediator) + ": !!ERROR!!: " + msg, t); } // REVISIT - this makes hcks sendTwoObjects fail // messageMediator.getConnection().close(); } private void sendResponse(CorbaMessageMediator messageMediator) { if (orb.subcontractDebugFlag) { dprint(".handleRequest: " + opAndId(messageMediator) + ": sending response"); } // REVISIT - type and location CDROutputObject outputObject = (CDROutputObject) messageMediator.getOutputObject(); if (outputObject != null) { // REVISIT - can be null for TRANSIENT below. outputObject.finishSendingMessage(); } } private void endRequest(CorbaMessageMediator messageMediator) { ORB orb = (ORB) messageMediator.getBroker(); if (orb.subcontractDebugFlag) { dprint(".handleRequest<-: " + opAndId(messageMediator)); } // release NIO ByteBuffers to ByteBufferPool try { OutputObject outputObj = messageMediator.getOutputObject(); if (outputObj != null) { outputObj.close(); } InputObject inputObj = messageMediator.getInputObject(); if (inputObj != null) { inputObj.close(); } } catch (IOException ex) { // Given what close() does, this catch shouldn't ever happen. // See CDRInput/OutputObject.close() for more info. // It also won't result in a Corba error if an IOException happens. if (orb.subcontractDebugFlag) { dprint(".endRequest: IOException:" + ex.getMessage(), ex); } } finally { ((CorbaConnection)messageMediator.getConnection()).serverRequestProcessingEnds(); } } protected void handleRequestRequest(CorbaMessageMediator messageMediator) { // Does nothing if already unmarshaled. ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader(); ORB orb = (ORB)messageMediator.getBroker(); orb.checkShutdownState(); ObjectKey okey = messageMediator.getObjectKey(); if (orb.subcontractDebugFlag) { ObjectKeyTemplate oktemp = okey.getTemplate() ; dprint( ".handleRequest: " + opAndId(messageMediator) + ": dispatching to scid: " + oktemp.getSubcontractId()); } CorbaServerRequestDispatcher sc = okey.getServerRequestDispatcher(orb); if (orb.subcontractDebugFlag) { dprint(".handleRequest: " + opAndId(messageMediator) + ": dispatching to sc: " + sc); } if (sc == null) { throw wrapper.noServerScInDispatch() ; } // NOTE: // This is necessary so mediator can act as ResponseHandler // and pass necessary info to response constructors located // in the subcontract. // REVISIT - same class right now. //messageMediator.setProtocolHandler(this); try { orb.startingDispatch(); sc.dispatch(messageMediator); } finally { orb.finishedDispatch(); } } protected void handleLocateRequest(CorbaMessageMediator messageMediator) { ORB orb = (ORB)messageMediator.getBroker(); LocateRequestMessage msg = (LocateRequestMessage) messageMediator.getDispatchHeader(); IOR ior = null; LocateReplyMessage reply = null; short addrDisp = -1; try { ((CDRInputObject)messageMediator.getInputObject()).unmarshalHeader(); CorbaServerRequestDispatcher sc = msg.getObjectKey().getServerRequestDispatcher( orb ) ; if (sc == null) { return; } ior = sc.locate(msg.getObjectKey()); if ( ior == null ) { reply = MessageBase.createLocateReply( orb, msg.getGIOPVersion(), msg.getEncodingVersion(),
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?