📄 httpclientmessenger.java
字号:
**/ public EndpointAddress getLogicalDestinationImpl() { return logicalDest; } /** * {@inheritDoc} **/ public boolean isIdleImpl() { return isClosed() || (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), lastUsed) > MESSENGER_IDLE_TIMEOUT); } /** * Connects to the http server and retreives the Logical Destination Address **/ private final EndpointAddress retreiveLogicalDestinationAddress() throws IOException { long beginConnectTime = 0; long connectTime = 0; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Ping (" + senderURL + ")"); } if (TransportMeterBuildSettings.TRANSPORT_METERING) { beginConnectTime = TimeUtils.timeNow(); } // open a connection to the other end HttpURLConnection urlConn = (HttpURLConnection) senderURL.openConnection(); urlConn.setRequestMethod("GET"); urlConn.setDoOutput(true); urlConn.setDoInput(true); urlConn.setAllowUserInteraction(false); urlConn.setUseCaches(false); urlConn.setConnectTimeout( CONNECT_TIMEOUT ); urlConn.setReadTimeout( CONNECT_TIMEOUT ); try { // this is where the connection is actually made, if not already // connected. If we can't connect, assume it is dead int code = urlConn.getResponseCode(); if (code != HttpURLConnection.HTTP_OK) { if (TransportMeterBuildSettings.TRANSPORT_METERING) { transportBindingMeter = servletHttpTransport.getTransportBindingMeter(null, getDestinationAddress()); if (transportBindingMeter != null) { transportBindingMeter.connectionFailed(true, TimeUtils.timeNow() - beginConnectTime); } } throw new IOException("Message not accepted: HTTP status " + "code=" + code + " reason=" + urlConn.getResponseMessage()); } // check for a returned peerId int msglength = urlConn.getContentLength(); if (msglength <= 0) { throw new IOException( "Ping response was empty." ); } InputStream inputStream = urlConn.getInputStream(); // read the peerId byte[] uniqueIdBytes = new byte[msglength]; int bytesRead = 0; while (bytesRead < msglength) { int thisRead = inputStream.read(uniqueIdBytes, bytesRead, msglength - bytesRead); if ( thisRead < 0 ) { break; } bytesRead += thisRead; } if (bytesRead < msglength) { throw new IOException("Content ended before promised Content length"); } String uniqueIdString; try { uniqueIdString = new String( uniqueIdBytes, "UTF-8" ); } catch( UnsupportedEncodingException never ) { // utf-8 is always available, but we handle it anyway. uniqueIdString = new String( uniqueIdBytes ); } if (TransportMeterBuildSettings.TRANSPORT_METERING) { connectTime = TimeUtils.timeNow(); transportBindingMeter = servletHttpTransport.getTransportBindingMeter(uniqueIdString, getDestinationAddress()); if (transportBindingMeter != null) { transportBindingMeter.connectionEstablished(true, connectTime - beginConnectTime); transportBindingMeter.ping(connectTime); transportBindingMeter.connectionClosed(true, connectTime - beginConnectTime); } } EndpointAddress remoteAddress = new EndpointAddress("jxta", uniqueIdString.trim(), null, null); if (LOG.isEnabledFor(Level.TRACE)) { LOG.trace("Ping (" + senderURL + ") -> " + remoteAddress); } return remoteAddress; } catch (IOException failure) { if (TransportMeterBuildSettings.TRANSPORT_METERING) { connectTime = TimeUtils.timeNow(); transportBindingMeter = servletHttpTransport.getTransportBindingMeter(null, getDestinationAddress()); if (transportBindingMeter != null) { transportBindingMeter.connectionFailed(true, connectTime - beginConnectTime); } } if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Ping (" + senderURL + ") -> failed" ); } throw failure; } } /** * Connects to the http server and POSTs the message **/ private boolean doSend(Message msg) throws IOException { long beginConnectTime = 0; long connectTime = 0; if (TransportMeterBuildSettings.TRANSPORT_METERING) { beginConnectTime = TimeUtils.timeNow(); } WireFormatMessage serialed = WireFormatMessageFactory.toWire(msg, EndpointServiceImpl.DEFAULT_MESSAGE_TYPE, (MimeMediaType[]) null); for (int connectAttempt = 1; connectAttempt <= CONNECT_RETRIES; connectAttempt++) { if( connectAttempt > 1 ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Retrying connection to " + senderURL); } } // open a connection to the other end HttpURLConnection urlConn = (HttpURLConnection) senderURL.openConnection(); try { urlConn.setRequestMethod("POST"); urlConn.setDoOutput(true); urlConn.setDoInput(true); urlConn.setAllowUserInteraction(false); urlConn.setUseCaches(false); urlConn.setConnectTimeout( CONNECT_TIMEOUT ); urlConn.setReadTimeout( CONNECT_TIMEOUT ); // FIXME 20040907 bondolo Should set message encoding http header. urlConn.setRequestProperty("content-length", Integer.toString((int) serialed.getByteLength())); urlConn.setRequestProperty("content-type", serialed.getMimeType().toString()); // send the message OutputStream out = urlConn.getOutputStream(); if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { connectTime = TimeUtils.timeNow(); transportBindingMeter.connectionEstablished(true, connectTime - beginConnectTime); } serialed.sendToStream(out); out.flush(); int responseCode; try { responseCode = urlConn.getResponseCode(); } catch( SocketTimeoutException expired ) { // maybe a retry will help. continue; } catch (IOException ioe) { // Could not connect. This seems to happen a lot with a loaded HTTP 1.0 // proxy. Apparently, HttpUrlConnection can be fooled by the proxy // in believing that the connection is still open and thus breaks // when attempting to make a second transaction. We should not have to but it // seems that it befalls us to retry. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("HTTP 1.0 proxy seems in use"); } // maybe a retry will help. continue; } // NOTE: If the proxy closed the connection 1.0 style without returning // a status line, we do not get an exception: we get a -1 response code. // Apparently, proxies no-longer do that anymore. Just in case, we issue a // warning and treat it as OK.71 if (responseCode == -1) { if (neverWarned && LOG.isEnabledFor(Level.WARN)) { LOG.warn("Obsolete HTTP proxy does not issue HTTP_OK response. Assuming OK"); neverWarned = false; } responseCode = HttpURLConnection.HTTP_OK; } if (responseCode != HttpURLConnection.HTTP_OK) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.dataSent(true, serialed.getByteLength()); transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime); } throw new IOException("Message not accepted: HTTP status " + "code=" + responseCode + " reason=" + urlConn.getResponseMessage()); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { long messageSentTime = TimeUtils.timeNow(); transportBindingMeter.messageSent(true, msg, messageSentTime - connectTime, serialed.getByteLength()); transportBindingMeter.connectionClosed(true, messageSentTime - beginConnectTime); } // note that we successfully sent a message lastUsed = TimeUtils.timeNow(); return true; } finally { // This does prevent the creation of an infinite number of connections // if we happen to be going through a 1.0-only proxy or connect to a server // that still does not set content length to zero for the response. With this, at // least we close them (they eventualy close anyway because the other side closes // them but it takes too much time). If content-length is set, then jdk ignores // the disconnect AND reuses the connection, which is what we want. urlConn.disconnect(); } } return false; } /** * Polls for messages sent to us. **/ private class MessagePoller implements Runnable { /** * If <tt>true</tt> then this poller is stopped or stopping. */ private volatile boolean stopped = false; /** * The thread that does the work. **/ private Thread pollerThread; /** * The URL we poll for messages. **/ private final URL pollingURL; MessagePoller( String pollAddress, EndpointAddress destAddr ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("new MessagePoller for " + senderURL); } /* * query string is of the format ?{response timeout},{extra response timeout},{dest address} * * The timeout's are expressed in milliseconds. -1 means do not wait * at all, 0 means wait forever. */ try { pollingURL = new URL( senderURL, "/" + pollAddress + "?" + Integer.toString(RESPONSE_TIMEOUT) + "," + Integer.toString(EXTRA_RESPONSE_TIMEOUT) + "," + destAddr ); } catch( MalformedURLException badAddr ) { IllegalArgumentException failure = new IllegalArgumentException( "Could not construct polling URL" ); failure.initCause( badAddr ); throw failure; } pollerThread = new Thread(this, "HttpClientMessenger poller for " + senderURL); pollerThread.setDaemon(true); pollerThread.start(); } protected void stop() { if (stopped) { return; } if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Stop polling for " + senderURL); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -