corbamessagemediatorimpl.java
来自「JAVA 所有包」· Java 代码 · 共 2,135 行 · 第 1/5 页
JAVA
2,135 行
messageHeader = requestHeader = (RequestMessage) header; setInputObject(); } finally { setWorkThenPoolOrResumeSelect(header); } getProtocolHandler().handleRequest(header, this); } catch (Throwable t) { if (transportDebug()) dprint(".REQUEST 1.0: !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".REQUEST 1.0<-: " + header); } } public void handleInput(RequestMessage_1_1 header) throws IOException { try { if (transportDebug()) dprint(".REQUEST 1.1->: " + header); try { messageHeader = requestHeader = (RequestMessage) header; setInputObject(); connection.serverRequest_1_1_Put(this); } finally { setWorkThenPoolOrResumeSelect(header); } getProtocolHandler().handleRequest(header, this); } catch (Throwable t) { if (transportDebug()) dprint(".REQUEST 1.1: !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".REQUEST 1.1<-: " + header); } } // REVISIT: this is identical to 1_0 except for fragment part. public void handleInput(RequestMessage_1_2 header) throws IOException { try { try { messageHeader = requestHeader = (RequestMessage) header; header.unmarshalRequestID(dispatchByteBuffer); setInputObject(); if (transportDebug()) dprint(".REQUEST 1.2->: id/" + header.getRequestId() + ": " + header); // NOTE: in the old code this used to be done conditionally: // if (header.moreFragmentsToFollow()). // Now we always put it in. We take it out when // the response is done. // This must happen now so if a header is fragmented the stream // may be found. connection.serverRequestMapPut(header.getRequestId(), this); } finally { // Leader/Follower. // Note: This *MUST* come after putting stream in above map // since the header may be fragmented and you do not want to // start reading again until the map above is set. setWorkThenPoolOrResumeSelect(header); } //inputObject.unmarshalHeader(); // done in subcontract. getProtocolHandler().handleRequest(header, this); } catch (Throwable t) { if (transportDebug()) dprint(".REQUEST 1.2: id/" + header.getRequestId() + ": !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { connection.serverRequestMapRemove(header.getRequestId()); if (transportDebug()) dprint(".REQUEST 1.2<-: id/" + header.getRequestId() + ": " + header); } } public void handleInput(ReplyMessage_1_0 header) throws IOException { try { try { if (transportDebug()) dprint(".REPLY 1.0->: " + header); messageHeader = replyHeader = (ReplyMessage) header; setInputObject(); // REVISIT: this should be done by waiting thread. inputObject.unmarshalHeader(); signalResponseReceived(); } finally{ setWorkThenReadOrResumeSelect(header); } } catch (Throwable t) { if (transportDebug())dprint(".REPLY 1.0: !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".REPLY 1.0<-: " + header); } } public void handleInput(ReplyMessage_1_1 header) throws IOException { try { if (transportDebug()) dprint(".REPLY 1.1->: " + header); messageHeader = replyHeader = (ReplyMessage) header; setInputObject(); if (header.moreFragmentsToFollow()) { // More fragments are coming to complete this reply, so keep // a reference to the InputStream so we can add the fragments connection.clientReply_1_1_Put(this); // In 1.1, we can't assume that we have the request ID in the // first fragment. Thus, another thread is used // to be the reader while this thread unmarshals // the extended header and wakes up the client thread. setWorkThenPoolOrResumeSelect(header); // REVISIT - error handling. // This must be done now. inputObject.unmarshalHeader(); signalResponseReceived(); } else { // Not fragmented, therefore we know the request // ID is here. Thus, we can unmarshal the extended header // and wake up the client thread without using a third // thread as above. // REVISIT - error handling during unmarshal. // This must be done now to get the request id. inputObject.unmarshalHeader(); signalResponseReceived(); setWorkThenReadOrResumeSelect(header); } } catch (Throwable t) { if (transportDebug()) dprint(".REPLY 1.1: !!ERROR!!: " + header); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".REPLY 1.1<-: " + header); } } public void handleInput(ReplyMessage_1_2 header) throws IOException { try { try { messageHeader = replyHeader = (ReplyMessage) header; // We know that the request ID is in the first fragment header.unmarshalRequestID(dispatchByteBuffer); if (transportDebug()) { dprint(".REPLY 1.2->: id/" + + header.getRequestId() + ": more?: " + header.moreFragmentsToFollow() + ": " + header); } setInputObject(); signalResponseReceived(); } finally { setWorkThenReadOrResumeSelect(header); } } catch (Throwable t) { if (transportDebug()) dprint(".REPLY 1.2: id/" + header.getRequestId() + ": !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".REPLY 1.2<-: id/" + header.getRequestId() + ": " + header); } } public void handleInput(LocateRequestMessage_1_0 header) throws IOException { try { if (transportDebug()) dprint(".LOCATE_REQUEST 1.0->: " + header); try { messageHeader = header; setInputObject(); } finally { setWorkThenPoolOrResumeSelect(header); } getProtocolHandler().handleRequest(header, this); } catch (Throwable t) { if (transportDebug()) dprint(".LOCATE_REQUEST 1.0: !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".LOCATE_REQUEST 1.0<-: " + header); } } public void handleInput(LocateRequestMessage_1_1 header) throws IOException { try { if (transportDebug()) dprint(".LOCATE_REQUEST 1.1->: " + header); try { messageHeader = header; setInputObject(); } finally { setWorkThenPoolOrResumeSelect(header); } getProtocolHandler().handleRequest(header, this); } catch (Throwable t) { if (transportDebug()) dprint(".LOCATE_REQUEST 1.1: !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".LOCATE_REQUEST 1.1<-:" + header); } } public void handleInput(LocateRequestMessage_1_2 header) throws IOException { try { try { messageHeader = header; header.unmarshalRequestID(dispatchByteBuffer); setInputObject(); if (transportDebug()) dprint(".LOCATE_REQUEST 1.2->: id/" + header.getRequestId() + ": " + header); if (header.moreFragmentsToFollow()) { connection.serverRequestMapPut(header.getRequestId(),this); } } finally { setWorkThenPoolOrResumeSelect(header); } getProtocolHandler().handleRequest(header, this); } catch (Throwable t) { if (transportDebug()) dprint(".LOCATE_REQUEST 1.2: id/" + header.getRequestId() + ": !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".LOCATE_REQUEST 1.2<-: id/" + header.getRequestId() + ": " + header); } } public void handleInput(LocateReplyMessage_1_0 header) throws IOException { try { if (transportDebug()) dprint(".LOCATE_REPLY 1.0->:" + header); try { messageHeader = header; setInputObject(); inputObject.unmarshalHeader(); // REVISIT Put in subcontract. signalResponseReceived(); } finally { setWorkThenReadOrResumeSelect(header); } } catch (Throwable t) { if (transportDebug()) dprint(".LOCATE_REPLY 1.0: !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".LOCATE_REPLY 1.0<-: " + header); } } public void handleInput(LocateReplyMessage_1_1 header) throws IOException { try { if (transportDebug()) dprint(".LOCATE_REPLY 1.1->: " + header); try { messageHeader = header; setInputObject(); // Fragmented LocateReplies are not allowed in 1.1. inputObject.unmarshalHeader(); signalResponseReceived(); } finally { setWorkThenReadOrResumeSelect(header); } } catch (Throwable t) { if (transportDebug()) dprint(".LOCATE_REPLY 1.1: !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".LOCATE_REPLY 1.1<-: " + header); } } public void handleInput(LocateReplyMessage_1_2 header) throws IOException { try { try { messageHeader = header; // No need to put in client reply map - already there. header.unmarshalRequestID(dispatchByteBuffer); setInputObject(); if (transportDebug()) dprint(".LOCATE_REPLY 1.2->: id/" + header.getRequestId() + ": " + header); signalResponseReceived(); } finally { setWorkThenPoolOrResumeSelect(header); // REVISIT } } catch (Throwable t) { if (transportDebug()) dprint(".LOCATE_REPLY 1.2: id/" + header.getRequestId() + ": !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".LOCATE_REPLY 1.2<-: id/" + header.getRequestId() + ": " + header); } } public void handleInput(FragmentMessage_1_1 header) throws IOException { try { if (transportDebug()) { dprint(".FRAGMENT 1.1->: " + "more?: " + header.moreFragmentsToFollow() + ": " + header); } try { messageHeader = header; MessageMediator mediator = null; CDRInputObject inputObject = null; if (connection.isServer()) { mediator = connection.serverRequest_1_1_Get(); } else { mediator = connection.clientReply_1_1_Get(); } if (mediator != null) { inputObject = (CDRInputObject) mediator.getInputObject(); } // If no input stream available, then discard the fragment. // This can happen: // 1. if a fragment message is received prior to receiving // the original request/reply message. Very unlikely. // 2. if a fragment message is received after the // reply has been sent (early replies) // Note: In the case of early replies, the fragments received // during the request processing (which are never unmarshaled), // will eventually be discarded by the GC. if (inputObject == null) { if (transportDebug()) dprint(".FRAGMENT 1.1: ++++DISCARDING++++: " + header); // need to release dispatchByteBuffer to pool if // we are discarding releaseByteBufferToPool(); return; } inputObject.getBufferManager() .processFragment(dispatchByteBuffer, header); if (! header.moreFragmentsToFollow()) { if (connection.isServer()) { connection.serverRequest_1_1_Remove(); } else { connection.clientReply_1_1_Remove(); } } } finally { // NOTE: This *must* come after queing the fragment // when using the selector to ensure fragments stay in order. setWorkThenReadOrResumeSelect(header); } } catch (Throwable t) { if (transportDebug()) dprint(".FRAGMENT 1.1: !!ERROR!!: " + header, t); // Mask the exception from thread.; } finally { if (transportDebug()) dprint(".FRAGMENT 1.1<-: " + header); } } public void handleInput(FragmentMessage_1_2 header) throws IOException { try { try { messageHeader = header; // Note: We know it's a 1.2 fragment, we have the data, but // we need the IIOPInputStream instance to unmarshal the
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?