📄 httpclientmessenger.java
字号:
destAddressToUse.toString(), null); message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement); try { doSend(message); } catch (IOException e) { // close this messenger close(); // rethrow the exception throw e; } } /** * {@inheritDoc} */ @Override public EndpointAddress getLogicalDestinationImpl() { return logicalDest; } /** * {@inheritDoc} */ @Override public boolean isIdleImpl() { return isClosed() || (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastUsed) > MESSENGER_IDLE_TIMEOUT); } /** * Connects to the http server and retrieves the Logical Destination Address */ private EndpointAddress retreiveLogicalDestinationAddress() throws IOException { long beginConnectTime = 0; long connectTime = 0; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Ping (" + senderURL + ")"); } if (TransportMeterBuildSettings.TRANSPORT_METERING) { beginConnectTime = TimeUtils.timeNow(); } // open a connection to the other end HttpURLConnection urlConn = (HttpURLConnection) senderURL.openConnection(); urlConn.setRequestMethod("GET"); urlConn.setDoOutput(true); urlConn.setDoInput(true); urlConn.setAllowUserInteraction(false); urlConn.setUseCaches(false); urlConn.setConnectTimeout(CONNECT_TIMEOUT); urlConn.setReadTimeout(CONNECT_TIMEOUT); try { // this is where the connection is actually made, if not already // connected. If we can't connect, assume it is dead int code = urlConn.getResponseCode(); if (code != HttpURLConnection.HTTP_OK) { if (TransportMeterBuildSettings.TRANSPORT_METERING) { transportBindingMeter = servletHttpTransport.getTransportBindingMeter(null, getDestinationAddress()); if (transportBindingMeter != null) { transportBindingMeter.connectionFailed(true, TimeUtils.timeNow() - beginConnectTime); } } throw new IOException("Message not accepted: HTTP status " + "code=" + code + " reason=" + urlConn.getResponseMessage()); } // check for a returned peerId int msglength = urlConn.getContentLength(); if (msglength <= 0) { throw new IOException("Ping response was empty."); } InputStream inputStream = urlConn.getInputStream(); // read the peerId byte[] uniqueIdBytes = new byte[msglength]; int bytesRead = 0; while (bytesRead < msglength) { int thisRead = inputStream.read(uniqueIdBytes, bytesRead, msglength - bytesRead); if (thisRead < 0) { break; } bytesRead += thisRead; } if (bytesRead < msglength) { throw new IOException("Content ended before promised Content length"); } String uniqueIdString; try { uniqueIdString = new String(uniqueIdBytes, "UTF-8"); } catch (UnsupportedEncodingException never) { // utf-8 is always available, but we handle it anyway. uniqueIdString = new String(uniqueIdBytes); } if (TransportMeterBuildSettings.TRANSPORT_METERING) { connectTime = TimeUtils.timeNow(); transportBindingMeter = servletHttpTransport.getTransportBindingMeter(uniqueIdString, getDestinationAddress()); if (transportBindingMeter != null) { transportBindingMeter.connectionEstablished(true, connectTime - beginConnectTime); transportBindingMeter.ping(connectTime); transportBindingMeter.connectionClosed(true, connectTime - beginConnectTime); } } EndpointAddress remoteAddress = new EndpointAddress("jxta", uniqueIdString.trim(), null, null); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Ping (" + senderURL + ") -> " + remoteAddress); } return remoteAddress; } catch (IOException failure) { if (TransportMeterBuildSettings.TRANSPORT_METERING) { connectTime = TimeUtils.timeNow(); transportBindingMeter = servletHttpTransport.getTransportBindingMeter(null, getDestinationAddress()); if (transportBindingMeter != null) { transportBindingMeter.connectionFailed(true, connectTime - beginConnectTime); } } if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Ping (" + senderURL + ") -> failed"); } throw failure; } } /** * Connects to the http server and POSTs the message */ private void doSend(Message msg) throws IOException { long beginConnectTime = 0; long connectTime = 0; if (TransportMeterBuildSettings.TRANSPORT_METERING) { beginConnectTime = TimeUtils.timeNow(); } WireFormatMessage serialed = WireFormatMessageFactory.toWire(msg, EndpointServiceImpl.DEFAULT_MESSAGE_TYPE, null); for (int connectAttempt = 1; connectAttempt <= CONNECT_RETRIES; connectAttempt++) { if (connectAttempt > 1) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Retrying connection to " + senderURL); } } // open a connection to the other end HttpURLConnection urlConn = (HttpURLConnection) senderURL.openConnection(); try { urlConn.setRequestMethod("POST"); urlConn.setDoOutput(true); urlConn.setDoInput(true); urlConn.setAllowUserInteraction(false); urlConn.setUseCaches(false); urlConn.setConnectTimeout(CONNECT_TIMEOUT); urlConn.setReadTimeout(CONNECT_TIMEOUT); // FIXME 20040907 bondolo Should set message encoding http header. urlConn.setRequestProperty("content-length", Long.toString(serialed.getByteLength())); urlConn.setRequestProperty("content-type", serialed.getMimeType().toString()); // send the message OutputStream out = urlConn.getOutputStream(); if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { connectTime = TimeUtils.timeNow(); transportBindingMeter.connectionEstablished(true, connectTime - beginConnectTime); } serialed.sendToStream(out); out.flush(); int responseCode; try { responseCode = urlConn.getResponseCode(); } catch (SocketTimeoutException expired) { // maybe a retry will help. continue; } catch (IOException ioe) { // Could not connect. This seems to happen a lot with a loaded HTTP 1.0 // proxy. Apparently, HttpUrlConnection can be fooled by the proxy // in believing that the connection is still open and thus breaks // when attempting to make a second transaction. We should not have to but it // seems that it befalls us to retry. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("HTTP 1.0 proxy seems in use"); } // maybe a retry will help. continue; } // NOTE: If the proxy closed the connection 1.0 style without returning // a status line, we do not get an exception: we get a -1 response code. // Apparently, proxies no-longer do that anymore. Just in case, we issue a // warning and treat it as OK.71 if (responseCode == -1) { if (neverWarned && Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Obsolete HTTP proxy does not issue HTTP_OK response. Assuming OK"); neverWarned = false; } responseCode = HttpURLConnection.HTTP_OK; } if (responseCode != HttpURLConnection.HTTP_OK) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.dataSent(true, serialed.getByteLength()); transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime); } throw new IOException( "Message not accepted: HTTP status " + "code=" + responseCode + " reason=" + urlConn.getResponseMessage()); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { long messageSentTime = TimeUtils.timeNow(); transportBindingMeter.messageSent(true, msg, messageSentTime - connectTime, serialed.getByteLength()); transportBindingMeter.connectionClosed(true, messageSentTime - beginConnectTime); } // note that we successfully sent a message lastUsed = TimeUtils.timeNow(); return; } finally { // This does prevent the creation of an infinite number of connections // if we happen to be going through a 1.0-only proxy or connect to a server // that still does not set content length to zero for the response. With this, at // least we close them (they eventualy close anyway because the other side closes // them but it takes too much time). If content-length is set, then jdk ignores // the disconnect AND reuses the connection, which is what we want. urlConn.disconnect(); } } throw new IOException("Failed sending " + msg + " to " + senderURL); } /** * Polls for messages sent to us. */ private class MessagePoller implements Runnable { /** * If <tt>true</tt> then this poller is stopped or stopping. */ private volatile boolean stopped = false; /** * The thread that does the work. */ private Thread pollerThread; /** * The URL we poll for messages. */ private final URL pollingURL; MessagePoller(String pollAddress, EndpointAddress destAddr) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("new MessagePoller for " + senderURL); } /* * query string is of the format ?{response timeout},{extra response timeout},{dest address} * * The timeout's are expressed in milliseconds. -1 means do not wait * at all, 0 means wait forever. */ try { pollingURL = new URL(senderURL, "/" + pollAddress + "?" + Integer.toString(RESPONSE_TIMEOUT) + "," + Integer.toString(EXTRA_RESPONSE_TIMEOUT) + "," + destAddr); } catch (MalformedURLException badAddr) { IllegalArgumentException failure = new IllegalArgumentException("Could not construct polling URL"); failure.initCause(badAddr); throw failure; } pollerThread = new Thread(this, "HttpClientMessenger poller for " + senderURL); pollerThread.setDaemon(true); pollerThread.start(); } protected void stop() { if (stopped) { return; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -