📄 httpclientmessenger.java
字号:
if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Stop polling for " + senderURL); } stopped = true; // Here, we are forced to abandon this object open. Because we could // get blocked forever trying to close it. It will rot away after // the current read returns. The best we can do is interrupt the // thread; unlikely to have an effect per the current. // HttpURLConnection implementation. Thread stopPoller = pollerThread; if (null != stopPoller) { stopPoller.interrupt(); } } /** * Returns {@code true} if this messenger is stopped otherwise * {@code false}. * * @return returns {@code true} if this messenger is stopped otherwise * {@code false}. */ protected boolean isStopped() { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(this + " " + senderURL + " --> " + (stopped ? "stopped" : "running")); } return stopped; } /** * {@inheritDoc} * * <p/>Connects to the http server and waits for messages to be received and processes them. */ public void run() { try { long beginConnectTime = 0; long connectTime = 0; long noReconnectBefore = 0; HttpURLConnection conn = null; if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Message polling beings for " + pollingURL); } int connectAttempt = 1; // get messages until the messenger is closed while (!isStopped()) { if (conn == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Opening new connection to " + pollingURL); } conn = (HttpURLConnection) pollingURL.openConnection(); // Incomming data channel conn.setRequestMethod("GET"); conn.setDoOutput(false); conn.setDoInput(true); conn.setAllowUserInteraction(false); conn.setUseCaches(false); conn.setConnectTimeout(CONNECT_TIMEOUT); conn.setReadTimeout(RESPONSE_TIMEOUT); if (TransportMeterBuildSettings.TRANSPORT_METERING) { beginConnectTime = TimeUtils.timeNow(); } // Loop back and try again to connect continue; } long untilNextConnect = TimeUtils.toRelativeTimeMillis(noReconnectBefore); try { if (untilNextConnect > 0) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Delaying for " + untilNextConnect + "ms before reconnect to " + senderURL); } Thread.sleep(untilNextConnect); } } catch (InterruptedException woken) { Thread.interrupted(); continue; } InputStream inputStream; MimeMediaType messageType; try { if (connectAttempt > 1) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Reconnect attempt for " + senderURL); } } // Always connect (no cost if connected). conn.connect(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Waiting for response code from " + senderURL); } int responseCode = conn.getResponseCode(); if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { LOG.finer( "Response " + responseCode + " for Connection : " + senderURL + "\n\tContent-Type : " + conn.getHeaderField("Content-Type") + "\tContent-Length : " + conn.getHeaderField("Content-Length") + "\tTransfer-Encoding : " + conn.getHeaderField("Transfer-Encoding")); } connectTime = TimeUtils.timeNow(); noReconnectBefore = TimeUtils.toAbsoluteTimeMillis(MIMIMUM_POLL_INTERVAL, connectTime); if (0 == conn.getContentLength()) { continue; } if (HttpURLConnection.HTTP_NO_CONTENT == responseCode) { // the connection timed out. if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionClosed(true, TimeUtils.toRelativeTimeMillis(beginConnectTime, connectTime)); } conn = null; continue; } if (responseCode != HttpURLConnection.HTTP_OK) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionClosed(true, TimeUtils.timeNow() - beginConnectTime); } throw new IOException("HTTP Failure: " + conn.getResponseCode() + " : " + conn.getResponseMessage()); } String contentType = conn.getHeaderField("Content-Type"); if (null == contentType) { // XXX 20051219 bondolo Figure out why the mime type is not always set. messageType = EndpointServiceImpl.DEFAULT_MESSAGE_TYPE; } else { messageType = MimeMediaType.valueOf(contentType); } // FIXME 20040907 bondolo Should get message content-encoding from http header. inputStream = conn.getInputStream(); // reset connection attempt. connectAttempt = 1; } catch (InterruptedIOException broken) { // We don't know where it was interrupted. Restart connection. Thread.interrupted(); if (connectAttempt > CONNECT_RETRIES) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Unable to connect to " + senderURL); } stop(); break; } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Failed connecting to " + senderURL); } if (null != conn) { conn.disconnect(); } conn = null; connectAttempt++; continue; } } catch (IOException ioe) { if (connectAttempt > CONNECT_RETRIES) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Unable to connect to " + senderURL, ioe); } stop(); break; } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Failed connecting to " + senderURL); } if (null != conn) { conn.disconnect(); } conn = null; connectAttempt++; continue; } } // start receiving messages try { while (!isStopped() && (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), connectTime) < RESPONSE_TIMEOUT)) { // read a message! long messageReceiveStart = TimeUtils.timeNow(); Message incomingMsg; incomingMsg = WireFormatMessageFactory.fromWire(inputStream, messageType, null); if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.messageReceived(true, incomingMsg, incomingMsg.getByteLength(), TimeUtils.timeNow() - messageReceiveStart); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Received " + incomingMsg + " from " + senderURL); } servletHttpTransport.executor.execute(new MessageProcessor(incomingMsg)); // note that we received a message lastUsed = TimeUtils.timeNow(); } if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionClosed(true, TimeUtils.timeNow() - beginConnectTime); } } catch (EOFException e) { // Connection ran out of messages. let it go. conn = null; } catch (InterruptedIOException broken) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime); } // We don't know where it was interrupted. Restart connection. Thread.interrupted(); if (null != conn) { conn.disconnect(); } conn = null; } catch (IOException e) { if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) { transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime); } // If we managed to get down here, it is really an error. // However, being disconnected from the server, for // whatever reason, is a common place event. No need to // clutter the screen with scary messages. When the // message layer believes it's serious, it prints the // scary message already. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Failed to read message from " + senderURL, e); } // Time to call this connection dead. stop(); break; } finally { try { inputStream.close(); } catch (IOException ignored) { //ignored } } } } catch (Throwable argh) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Poller exiting because of uncaught exception", argh); } stop(); } finally { pollerThread = null; } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Message polling stopped for " + senderURL); } } } /** * A small class for processing individual messages. */ private class MessageProcessor implements Runnable { private Message msg; MessageProcessor(Message msg) { this.msg = msg; } public void run() { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Demuxing " + msg + " from " + senderURL); } servletHttpTransport.getEndpointService().demux(msg); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -