corbamessagemediatorimpl.java
来自「JAVA 所有包」· Java 代码 · 共 2,135 行 · 第 1/5 页
JAVA
2,135 行
TypeCode tc = _exceptions.item(i); if ( tc.id().equals(repoId) ) { // Since we dont have the actual user exception // class, the spec says we have to create an // UnknownUserException and put it in the // environment. Any eany = orb.create_any(); eany.read_value(is, (TypeCode)tc); return new UnknownUserException(eany); } } } catch (Exception b) { throw wrapper.unexpectedDiiException(b); } // must be a truly unknown exception return wrapper.unknownCorbaExc( CompletionStatus.COMPLETED_MAYBE); } public void setDIIException(Exception exception) { diiRequest.env().exception(exception); } public void handleDIIReply(InputStream inputStream) { if (! isDIIRequest()) { return; } ((RequestImpl)diiRequest).unmarshalReply(inputStream); } public Message getDispatchHeader() { return dispatchHeader; } public void setDispatchHeader(Message msg) { dispatchHeader = msg; } public ByteBuffer getDispatchBuffer() { return dispatchByteBuffer; } public void setDispatchBuffer(ByteBuffer byteBuffer) { dispatchByteBuffer = byteBuffer; } public int getThreadPoolToUse() { int poolToUse = 0; Message msg = getDispatchHeader(); // A null msg should never happen. But, we'll be // defensive just in case. if (msg != null) { poolToUse = msg.getThreadPoolToUse(); } return poolToUse; } public byte getStreamFormatVersion() { // REVISIT: ContactInfo/Acceptor output object factories // just use this. Maybe need to distinguish: // createOutputObjectForRequest // createOutputObjectForReply // then do getStreamFormatVersionForRequest/ForReply here. if (streamFormatVersionSet) { return streamFormatVersion; } return getStreamFormatVersionForReply(); } /** * If the RMI-IIOP maximum stream format version service context * is present, it indicates the maximum stream format version we * could use for the reply. If it isn't present, the default is * 2 for GIOP 1.3 or greater, 1 for lower. * * This is only sent on requests. Clients can find out the * server's maximum by looking for a tagged component in the IOR. */ public byte getStreamFormatVersionForReply() { // NOTE: The request service contexts may indicate the max. ServiceContexts svc = getRequestServiceContexts(); MaxStreamFormatVersionServiceContext msfvsc = (MaxStreamFormatVersionServiceContext)svc.get( MaxStreamFormatVersionServiceContext.SERVICE_CONTEXT_ID); if (msfvsc != null) { byte localMaxVersion = ORBUtility.getMaxStreamFormatVersion(); byte remoteMaxVersion = msfvsc.getMaximumStreamFormatVersion(); return (byte)Math.min(localMaxVersion, remoteMaxVersion); } else { // Defaults to 1 for GIOP 1.2 or less, 2 for // GIOP 1.3 or higher. if (getGIOPVersion().lessThan(GIOPVersion.V1_3)) return ORBConstants.STREAM_FORMAT_VERSION_1; else return ORBConstants.STREAM_FORMAT_VERSION_2; } } public boolean isSystemExceptionReply() { return replyHeader.getReplyStatus() == ReplyMessage.SYSTEM_EXCEPTION; } public boolean isUserExceptionReply() { return replyHeader.getReplyStatus() == ReplyMessage.USER_EXCEPTION; } public boolean isLocationForwardReply() { return ( (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD) || (replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD_PERM) ); //return replyHeader.getReplyStatus() == ReplyMessage.LOCATION_FORWARD; } public boolean isDifferentAddrDispositionRequestedReply() { return replyHeader.getReplyStatus() == ReplyMessage.NEEDS_ADDRESSING_MODE; } public short getAddrDispositionReply() { return replyHeader.getAddrDisposition(); } public IOR getForwardedIOR() { return replyHeader.getIOR(); } public SystemException getSystemExceptionReply() { return replyHeader.getSystemException(replyExceptionDetailMessage); } //////////////////////////////////////////////////// // // Used by server side. // public ObjectKey getObjectKey() { return getRequestHeader().getObjectKey(); } public void setProtocolHandler(CorbaProtocolHandler protocolHandler) { throw wrapper.methodShouldNotBeCalled() ; } public CorbaProtocolHandler getProtocolHandler() { // REVISIT: should look up in orb registry. return this; } //////////////////////////////////////////////////// // // ResponseHandler // public org.omg.CORBA.portable.OutputStream createReply() { // Note: relies on side-effect of setting mediator output field. // REVISIT - cast - need interface getProtocolHandler().createResponse(this, (ServiceContexts) null); return (OutputStream) getOutputObject(); } public org.omg.CORBA.portable.OutputStream createExceptionReply() { // Note: relies on side-effect of setting mediator output field. // REVISIT - cast - need interface getProtocolHandler().createUserExceptionResponse(this, (ServiceContexts) null); return (OutputStream) getOutputObject(); } public boolean executeReturnServantInResponseConstructor() { return _executeReturnServantInResponseConstructor; } public void setExecuteReturnServantInResponseConstructor(boolean b) { _executeReturnServantInResponseConstructor = b; } public boolean executeRemoveThreadInfoInResponseConstructor() { return _executeRemoveThreadInfoInResponseConstructor; } public void setExecuteRemoveThreadInfoInResponseConstructor(boolean b) { _executeRemoveThreadInfoInResponseConstructor = b; } public boolean executePIInResponseConstructor() { return _executePIInResponseConstructor; } public void setExecutePIInResponseConstructor( boolean b ) { _executePIInResponseConstructor = b; } private byte getStreamFormatVersionForThisRequest(IOR ior, GIOPVersion giopVersion) { byte localMaxVersion = ORBUtility.getMaxStreamFormatVersion(); IOR effectiveTargetIOR = ((CorbaContactInfo)this.contactInfo).getEffectiveTargetIOR(); IIOPProfileTemplate temp = (IIOPProfileTemplate)effectiveTargetIOR.getProfile().getTaggedProfileTemplate(); Iterator iter = temp.iteratorById(TAG_RMI_CUSTOM_MAX_STREAM_FORMAT.value); if (!iter.hasNext()) { // Didn't have the max stream format version tagged // component. if (giopVersion.lessThan(GIOPVersion.V1_3)) return ORBConstants.STREAM_FORMAT_VERSION_1; else return ORBConstants.STREAM_FORMAT_VERSION_2; } byte remoteMaxVersion = ((MaxStreamFormatVersionComponent)iter.next()).getMaxStreamFormatVersion(); return (byte)Math.min(localMaxVersion, remoteMaxVersion); } //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// // REVISIT - This could be a separate implementation object looked // up in a registry. However it needs some state in the message // mediator so combine for now. protected boolean isThreadDone = false; //////////////////////////////////////////////////// // // pept.protocol.ProtocolHandler // public boolean handleRequest(MessageMediator messageMediator) { try { dispatchHeader.callback(this); } catch (IOException e) { // REVISIT - this should be handled internally. ; } return isThreadDone; } //////////////////////////////////////////////////// // // iiop.messages.MessageHandler // private void setWorkThenPoolOrResumeSelect(Message header) { if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) { resumeSelect(header); } else { // Leader/Follower when using reader thread. // When this thread is done working it will go back in pool. isThreadDone = true; // First unregister current registration. orb.getTransportManager().getSelector(0) .unregisterForEvent(getConnection().getEventHandler()); // Have another thread become the reader. orb.getTransportManager().getSelector(0) .registerForEvent(getConnection().getEventHandler()); } } private void setWorkThenReadOrResumeSelect(Message header) { if (getConnection().getEventHandler().shouldUseSelectThreadToWait()) { resumeSelect(header); } else { // When using reader thread then wen this thread is // done working it will continue reading. isThreadDone = false; } } private void resumeSelect(Message header) { // NOTE: VERY IMPORTANT: // Only participate in select after getting to the point // that proper serialization of fragments is ensured. if (transportDebug()) { dprint(".resumeSelect:->"); // REVISIT: not-OO: String requestId = "?"; if (header instanceof RequestMessage) { requestId = new Integer(((RequestMessage)header) .getRequestId()).toString(); } else if (header instanceof ReplyMessage) { requestId = new Integer(((ReplyMessage)header) .getRequestId()).toString(); } else if (header instanceof FragmentMessage_1_2) { requestId = new Integer(((FragmentMessage_1_2)header) .getRequestId()).toString(); } dprint(".resumeSelect: id/" + requestId + " " + getConnection() ); } // IMPORTANT: To avoid bug (4953599), we force the Thread that does the NIO select // to also do the enable/disable of Ops using SelectionKey.interestOps(Ops of Interest). // Otherwise, the SelectionKey.interestOps(Ops of Interest) may block indefinitely in // this thread. EventHandler eventHandler = getConnection().getEventHandler(); orb.getTransportManager().getSelector(0).registerInterestOps(eventHandler); if (transportDebug()) { dprint(".resumeSelect:<-"); } } private void setInputObject() { // REVISIT: refactor createInputObject (and createMessageMediator) // into base PlugInFactory. Get via connection (either ContactInfo // or Acceptor). if (getConnection().getContactInfo() != null) { inputObject = (CDRInputObject) getConnection().getContactInfo() .createInputObject(orb, this); } else if (getConnection().getAcceptor() != null) { inputObject = (CDRInputObject) getConnection().getAcceptor() .createInputObject(orb, this); } else { throw new RuntimeException("CorbaMessageMediatorImpl.setInputObject"); } inputObject.setMessageMediator(this); setInputObject(inputObject); } private void signalResponseReceived() { // This will end up using the MessageMediator associated with // the original request instead of the current mediator (which // need to be constructed to hold the dispatchBuffer and connection). connection.getResponseWaitingRoom() .responseReceived((InputObject)inputObject); } // This handles message types for which we don't create classes. public void handleInput(Message header) throws IOException { try { messageHeader = header; if (transportDebug()) dprint(".handleInput->: " + MessageBase.typeToString(header.getType())); setWorkThenReadOrResumeSelect(header); switch(header.getType()) { case Message.GIOPCloseConnection: if (transportDebug()) { dprint(".handleInput: CloseConnection: purging"); } connection.purgeCalls(wrapper.connectionRebind(), true, false); break; case Message.GIOPMessageError: if (transportDebug()) { dprint(".handleInput: MessageError: purging"); } connection.purgeCalls(wrapper.recvMsgError(), true, false); break; default: if (transportDebug()) { dprint(".handleInput: ERROR: " + MessageBase.typeToString(header.getType())); } throw wrapper.badGiopRequestType() ; } releaseByteBufferToPool(); } finally { if (transportDebug()) { dprint(".handleInput<-: " + MessageBase.typeToString(header.getType())); } } } public void handleInput(RequestMessage_1_0 header) throws IOException { try { if (transportDebug()) dprint(".REQUEST 1.0->: " + header); try {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?