📄 httpclientmessenger.java
字号:
stopped = true; // Here, we are forced to abandon this object open. Because we could // get blocked forever trying to close it. It will rot away after // the current read returns. The best we can do is interrupt the // thread; unlikely to have an effect per the current. // HttpURLConnection implementation. Thread stopPoller = pollerThread; if( null != stopPoller ) { stopPoller.interrupt(); } } /** * Returns {@code true} if this messenger is stopped otherwise * {@code false}. * * @return returns {@code true} if this messenger is stopped otherwise * {@code false}. **/ protected boolean isStopped() { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( this + " " +senderURL + " --> " + (stopped ? "stopped" : "running")); } return stopped; } /** * {@inheritDoc} * * <p/>Connects to the http server and waits for messages to be received and processes them. **/ public void run() { try { long beginConnectTime = 0; long connectTime = 0; long noReconnectBefore = 0; HttpURLConnection conn = null; if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Message polling beings for " + pollingURL); } int connectAttempt = 1; // get messages until the messenger is closed while (!isStopped()) { if (conn == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Opening new connection to " + pollingURL); } conn = (HttpURLConnection) pollingURL.openConnection(); // Incomming data channel conn.setRequestMethod("GET"); conn.setDoOutput(false); conn.setDoInput(true); conn.setAllowUserInteraction(false); conn.setUseCaches(false); conn.setConnectTimeout( CONNECT_TIMEOUT ); conn.setReadTimeout( RESPONSE_TIMEOUT ); if (TransportMeterBuildSettings.TRANSPORT_METERING) { beginConnectTime = TimeUtils.timeNow(); } // Loop back and try again to connect continue; } long untilNextConnect = TimeUtils.toRelativeTimeMillis( noReconnectBefore ); try { if( untilNextConnect > 0) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Delaying for " + untilNextConnect + "ms before reconnect to " + senderURL ); } Thread.sleep(untilNextConnect); } } catch(InterruptedException woken) { Thread.interrupted(); continue; } InputStream inputStream; MimeMediaType messageType; try { if( connectAttempt > 1 ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Reconnect attempt for " + senderURL); } } // Always connect (no cost if connected). conn.connect(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Waiting for response code from " + senderURL); } int responseCode = conn.getResponseCode(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "Response " + responseCode + " for Connection : " + senderURL + "\n\tContent-Type : " + conn.getHeaderField("Content-Type") + "\tContent-Length : " + conn.getHeaderField("Content-Length") + "\tTransfer-Encoding : " + conn.getHeaderField("Transfer-Encoding")); } connectTime = TimeUtils.timeNow(); noReconnectBefore = TimeUtils.toAbsoluteTimeMillis( MIMIMUM_POLL_INTERVAL, connectTime ); if( 0 == conn.getContentLength() ) { continue; } if( HttpURLConnection.HTTP_NO_CONTENT == responseCode) { // the connection timed out. if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionClosed(true, TimeUtils.toRelativeTimeMillis(beginConnectTime, connectTime)); } conn = null; continue; } if (responseCode != HttpURLConnection.HTTP_OK ) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionClosed(true, TimeUtils.timeNow() - beginConnectTime); } throw new IOException("HTTP Failure: " + conn.getResponseCode() + " : " + conn.getResponseMessage()); } String contentType = conn.getHeaderField("Content-Type"); if( null == contentType ) { // XXX 20051219 bondolo Figure out why the mime type is not always set. messageType = EndpointServiceImpl.DEFAULT_MESSAGE_TYPE; } else { messageType = MimeMediaType.valueOf( contentType ); } // FIXME 20040907 bondolo Should get message content-encoding from http header. inputStream = conn.getInputStream(); // reset connection attempt. connectAttempt = 1; } catch ( InterruptedIOException broken ) { // We don't know where it was interrupted. Restart connection. Thread.interrupted(); if( connectAttempt > CONNECT_RETRIES) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Unable to connect to " + senderURL ); } stop(); break; } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Failed connecting to " + senderURL ); } if( null != conn ) { conn.disconnect(); } conn = null; connectAttempt++; continue; } } catch (IOException ioe) { if( connectAttempt > CONNECT_RETRIES ) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Unable to connect to " + senderURL, ioe); } stop(); break; } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Failed connecting to " + senderURL ); } if( null != conn ) { conn.disconnect(); } conn = null; connectAttempt++; continue; } } // start receiving messages try { while (!isStopped() && (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), connectTime) < RESPONSE_TIMEOUT)) { // read a message! long messageReceiveStart = TimeUtils.timeNow(); Message incomingMsg = null; incomingMsg = WireFormatMessageFactory.fromWire(inputStream, messageType, null); if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.messageReceived(true, incomingMsg, incomingMsg.getByteLength(), TimeUtils.timeNow() - messageReceiveStart ); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received " + incomingMsg + " from " + senderURL); } try { HttpClientMessenger.this.servletHttpTransport.getEndpointService().demux(incomingMsg); } catch (Throwable e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failure demuxing an incoming message", e); } throw e; } // note that we received a message lastUsed = TimeUtils.timeNow(); } // // FIXME 20060105 bondolo Relay debugging impedement.// try {// Thread.sleep( 10 * TimeUtils.ASECOND );// } catch(InterruptedException woken) {// Thread.interrupted();// continue;// } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionClosed(true, TimeUtils.timeNow() - beginConnectTime); } } catch (EOFException e) { // Connection ran out of messages. let it go. conn = null; continue; } catch ( InterruptedIOException broken ) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime); } // We don't know where it was interrupted. Restart connection. Thread.interrupted(); if( null != conn ) { conn.disconnect(); } conn = null; continue; } catch (IOException e) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime); } // If we managed to get down here, it is really an error. // However, being disconnected from the server, for // whatever reason, is a common place event. No need to // clutter the screen with scary messages. When the // message layer believes it's serious, it prints the // scary message already. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Failed to read message from " + senderURL, e); } // Time to call this connection dead. stop(); break; } finally { try { inputStream.close(); } catch( IOException ignored ) { ; } } } } catch (Throwable argh) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Poller exiting because of uncaught exception", argh); } stop(); } finally { pollerThread = null; } if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Message polling stopped for " + senderURL); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -