📄 httpmessageservlet.java
字号:
} messenger = new HttpServletMessenger(owner.servletHttpTransport.group.getPeerGroupID(), localAddress, currentRequest.requestorAddr, messengerAliveFor); boolean taken = owner.messengerReadyEvent(messenger, currentRequest.destAddr); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Incoming messenger to: " + currentRequest.requestorAddr + " taken=" + taken); } if (!taken) { // nobody cares. Just destroy it. messenger.close(); messenger = null; } } // We may later decide that contentLength should not be set after all // if we use chunking. Otherwise we must set it; specially to zero, so // that jetty does not forcefully close the connection after each // message in order to complete the transaction http-1.0-style. boolean mustSetContentLength = true; try { // get the incoming message is there is one if (currentRequest.messageContent) { Message incomingMessage; // read the stream InputStream in = req.getInputStream(); // construct the message. Send BAD_REQUEST if the message construction // fails try { String contentType = req.getContentType(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Reading message from request : " + contentType); } MimeMediaType contentMimeType = EndpointServiceImpl.DEFAULT_MESSAGE_TYPE; if (null != contentType) { contentMimeType = MimeMediaType.valueOf(contentType); } // FIXME 20040927 bondolo Should get message encoding from http header. try { incomingMessage = WireFormatMessageFactory.fromWire(in, contentMimeType, null); } catch (NoSuchElementException noValidWireFormat) { IOException failure = new IOException("Unrecognized content type MIME type : " + contentType); failure.initCause(noValidWireFormat); throw failure; } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { lastReadWriteTime = TimeUtils.timeNow(); long receiveTime = TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), currentRequest.requestStartTime); transportBindingMeter.messageReceived(false, incomingMessage, receiveTime, 0); // size=0 since it was already incorporated in the request size } } catch (IOException e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Malformed JXTA message, responding with BAD_REQUEST", e); } res.sendError(HttpServletResponse.SC_BAD_REQUEST, "Message was not a valid JXTA message"); if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionDropped(false, TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), currentRequest.requestStartTime)); } return; } // post the incoming message to the endpoint demux if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Handing " + incomingMessage + " to the endpoint."); } try { endpoint.demux(incomingMessage); } catch (Throwable e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failure demuxing an incoming message", e); } } } boolean beganResponse = false; // Check if the back channel is to be used for sending messages. if ((currentRequest.responseTimeout >= 0) && (null != messenger)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Wait for message from the messenger. timeout = " + currentRequest.responseTimeout); } long quitAt = (currentRequest.responseTimeout == 0) ? Long.MAX_VALUE : TimeUtils.toAbsoluteTimeMillis(currentRequest.requestStartTime, currentRequest.responseTimeout); while ((0 != (messenger.getState() & Messenger.USABLE)) && !destroyed) { long remaining = TimeUtils.toRelativeTimeMillis(quitAt); if ((remaining <= 0)) { // done processing the request if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Terminating expired request."); } // We know we did not respond anything. // In general it's better if jetty closes the connection // here, because it could have been an unused // back-channel and the client has to open a new one // next time, thus making sure we get to see a different // URL (if applicable). JDK should do that anyway, // but ...). break; } Message outMsg; // Send a message if there is one try { outMsg = messenger.waitForMessage(remaining); } catch (InterruptedException ie) { // Ok. Leave early, then. Thread.interrupted(); continue; } if (outMsg == null) { // done processing the request if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Terminating request with no message to send."); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionClosed(false, TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), currentRequest.requestStartTime)); } // We know we did not respond anything. Do not set // content-length In general it's better if jetty closes // the connection here, because it could have been an // unused back-channel and the client has to open a new // one next time, thus making sure we get to see a // different URL (if applicable). JDK should do that // anyway, but ...). break; } long startMessageSend = TimeUtils.timeNow(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + outMsg + " on back channel to " + req.getRemoteHost()); } if (!beganResponse) { // valid request, send back OK response if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending OK in response to request"); } beganResponse = true; res.setStatus(HttpServletResponse.SC_OK); res.setContentType(EndpointServiceImpl.DEFAULT_MESSAGE_TYPE.toString()); } // send the message WireFormatMessage serialed = WireFormatMessageFactory.toWire(outMsg, EndpointServiceImpl.DEFAULT_MESSAGE_TYPE, null); // if only one message is being returned, set the content // length, otherwise try to use chunked encoding. if (currentRequest.extraResponsesTimeout < 0) { res.setContentLength((int) serialed.getByteLength()); } // Either way, we've done what had to be done. mustSetContentLength = false; // get the output stream for the response OutputStream out = res.getOutputStream(); // send the message try { serialed.sendToStream(out); out.flush(); messenger.messageSent(true); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Successfully sent " + outMsg + " on back channel to " + req.getRemoteHost()); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { lastReadWriteTime = TimeUtils.timeNow(); long sendTime = TimeUtils.toRelativeTimeMillis(lastReadWriteTime, startMessageSend); long bytesSent = serialed.getByteLength(); transportBindingMeter.messageSent(false, outMsg, sendTime, bytesSent); } } catch (IOException ex) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Failed sending Message on back channel to " + req.getRemoteHost()); } messenger.messageSent(false); if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionDropped(false, TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), currentRequest.requestStartTime)); } throw ex; } finally { // make sure the response is pushed out res.flushBuffer(); } // Adjust the quit time based upon the extra response time available. if (0 == currentRequest.extraResponsesTimeout) { quitAt = Long.MAX_VALUE; } else { quitAt = TimeUtils.toAbsoluteTimeMillis(currentRequest.requestStartTime, currentRequest.extraResponsesTimeout); } // If we never generated a response then make it clear we gave up waiting. if (!beganResponse) { res.setStatus(HttpServletResponse.SC_NO_CONTENT); } } } else { // No response was desired. res.setStatus(HttpServletResponse.SC_OK); } } finally { // close the messenger if (null != messenger) { messenger.close(); } } // If contentLength was never set and we have not decided *not* to set // it, then we must set it to 0 (that's the truth in that case). This // allows Jetty to keep to keep the connection open unless what's on the // other side is a 1.0 proxy. if (mustSetContentLength) { res.setContentLength(0); } // make sure the response is pushed out res.flushBuffer(); // done processing the request if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Finished processing the request from " + req.getRemoteHost()); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionClosed(false, TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), currentRequest.requestStartTime)); } } /** * Returns a response to a ping request. The response is the PeerID of * this peer. * * @param res The response to which the ping result should be sent. */ private void pingResponse(HttpServletResponse res) throws IOException { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Responding to \'ping\' request with 200 and peerID"); } res.setStatus(HttpServletResponse.SC_OK); res.setContentLength(pingResponseBytes.length); res.setContentType(MimeMediaType.TEXTUTF8.toString()); OutputStream out = res.getOutputStream(); out.write(pingResponseBytes); out.flush(); out.close(); } /** * Debugging output. */ private static void printRequest(HttpServletRequest req) { final char nl = '\n'; StringBuilder builder = new StringBuilder(); builder.append("HTTP request:" + nl); builder.append(" AUTH_TYPE: ").append(req.getAuthType()).append(nl); builder.append(" CONTEXT_PATH: ").append(req.getContextPath()).append(nl);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -