⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 httpclientmessenger.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                        stopped = true;                        // Here, we are forced to abandon this object open. Because we could            // get blocked forever trying to close it. It will rot away after            // the current read returns. The best we can do is interrupt the            // thread; unlikely to have an effect per the current.            // HttpURLConnection implementation.                        Thread stopPoller = pollerThread;                        if( null != stopPoller ) {                stopPoller.interrupt();            }        }                /**         *  Returns {@code true} if this messenger is stopped otherwise          *  {@code false}.         *         *  @return returns {@code true} if this messenger is stopped otherwise          *  {@code false}.         **/        protected boolean isStopped() {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug( this + " " +senderURL + " --> " + (stopped ? "stopped" : "running"));            }                        return stopped;        }                /**         *  {@inheritDoc}         *         *  <p/>Connects to the http server and waits for messages to be received and processes them.         **/        public void run() {            try {                long beginConnectTime = 0;                long connectTime = 0;                long noReconnectBefore = 0;                HttpURLConnection conn = null;                                if (LOG.isEnabledFor(Level.INFO)) {                    LOG.info("Message polling beings for " + pollingURL);                }                                int connectAttempt = 1;                                // get messages until the messenger is closed                while (!isStopped()) {                    if (conn == null) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Opening new connection to " + pollingURL);                        }                                                conn = (HttpURLConnection) pollingURL.openConnection(); // Incomming data channel                                                conn.setRequestMethod("GET");                        conn.setDoOutput(false);                        conn.setDoInput(true);                        conn.setAllowUserInteraction(false);                        conn.setUseCaches(false);                        conn.setConnectTimeout( CONNECT_TIMEOUT );                        conn.setReadTimeout( RESPONSE_TIMEOUT );                                                if (TransportMeterBuildSettings.TRANSPORT_METERING) {                            beginConnectTime = TimeUtils.timeNow();                        }                                                // Loop back and try again to connect                        continue;                    }                                        long untilNextConnect = TimeUtils.toRelativeTimeMillis( noReconnectBefore );                                        try {                        if( untilNextConnect > 0) {                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("Delaying for " + untilNextConnect + "ms before reconnect to " + senderURL );                            }                            Thread.sleep(untilNextConnect);                        }                    } catch(InterruptedException woken) {                        Thread.interrupted();                        continue;                    }                                        InputStream inputStream;                    MimeMediaType messageType;                                        try {                        if( connectAttempt > 1 ) {                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("Reconnect attempt for " + senderURL);                            }                        }                                                // Always connect (no cost if connected).                        conn.connect();                                                if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Waiting for response code from " + senderURL);                        }                                                int responseCode = conn.getResponseCode();                                                if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug( "Response " + responseCode + " for Connection : " + senderURL                                    + "\n\tContent-Type : " + conn.getHeaderField("Content-Type")                                    + "\tContent-Length : " + conn.getHeaderField("Content-Length")                                    + "\tTransfer-Encoding : " + conn.getHeaderField("Transfer-Encoding"));                        }                                                connectTime = TimeUtils.timeNow();                        noReconnectBefore = TimeUtils.toAbsoluteTimeMillis( MIMIMUM_POLL_INTERVAL, connectTime );                                                if( 0 == conn.getContentLength() ) {                            continue;                        }                                                if( HttpURLConnection.HTTP_NO_CONTENT == responseCode) {                            // the connection timed out.                            if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                                transportBindingMeter.connectionClosed(true, TimeUtils.toRelativeTimeMillis(beginConnectTime, connectTime));                            }                                                        conn = null;                            continue;                        }                                                if (responseCode != HttpURLConnection.HTTP_OK ) {                            if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                                transportBindingMeter.connectionClosed(true, TimeUtils.timeNow() - beginConnectTime);                            }                                                        throw new IOException("HTTP Failure: " + conn.getResponseCode() + " : " + conn.getResponseMessage());                        }                                                String contentType = conn.getHeaderField("Content-Type");                        if( null == contentType ) {                            // XXX 20051219 bondolo Figure out why the mime type is not always set.                            messageType = EndpointServiceImpl.DEFAULT_MESSAGE_TYPE;                        } else {                            messageType = MimeMediaType.valueOf( contentType );                        }                                                // FIXME 20040907 bondolo Should get message content-encoding from http header.                                                inputStream = conn.getInputStream();                                                // reset connection attempt.                        connectAttempt = 1;                    } catch ( InterruptedIOException broken ) {                        // We don't know where it was interrupted. Restart connection.                        Thread.interrupted();                                                if( connectAttempt > CONNECT_RETRIES) {                            if (LOG.isEnabledFor(Level.WARN)) {                                LOG.warn("Unable to connect to " + senderURL );                            }                                                        stop();                            break;                        }  else {                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("Failed connecting to " + senderURL );                            }                                                        if( null != conn ) {                                conn.disconnect();                            }                            conn = null;                            connectAttempt++;                            continue;                        }                    } catch (IOException ioe) {                        if( connectAttempt > CONNECT_RETRIES ) {                            if (LOG.isEnabledFor(Level.WARN)) {                                LOG.warn("Unable to connect to " + senderURL, ioe);                            }                                                        stop();                            break;                        }  else {                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("Failed connecting to " + senderURL );                            }                                                        if( null != conn ) {                                conn.disconnect();                            }                            conn = null;                            connectAttempt++;                            continue;                        }                    }                                        // start receiving messages                    try {                        while (!isStopped() && (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), connectTime) < RESPONSE_TIMEOUT)) {                            // read a message!                            long messageReceiveStart = TimeUtils.timeNow();                            Message incomingMsg = null;                                                        incomingMsg = WireFormatMessageFactory.fromWire(inputStream, messageType, null);                                                        if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                                transportBindingMeter.messageReceived(true, incomingMsg, incomingMsg.getByteLength(), TimeUtils.timeNow() - messageReceiveStart );                            }                                                        if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("Received " + incomingMsg + " from " + senderURL);                            }                                                        try {                                HttpClientMessenger.this.servletHttpTransport.getEndpointService().demux(incomingMsg);                            } catch (Throwable e) {                                if (LOG.isEnabledFor(Level.WARN)) {                                    LOG.warn("Failure demuxing an incoming message", e);                                }                                                                throw e;                            }                                                        // note that we received a message                            lastUsed = TimeUtils.timeNow();                        }                                                //                        // FIXME 20060105 bondolo Relay debugging impedement.//                        try {//                            Thread.sleep( 10 * TimeUtils.ASECOND );//                        } catch(InterruptedException woken) {//                            Thread.interrupted();//                            continue;//                       }                        if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                            transportBindingMeter.connectionClosed(true, TimeUtils.timeNow() - beginConnectTime);                        }                    } catch (EOFException e) {                        // Connection ran out of messages. let it go.                        conn = null;                        continue;                    } catch ( InterruptedIOException broken ) {                        if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                            transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime);                        }                                                // We don't know where it was interrupted. Restart connection.                        Thread.interrupted();                        if( null != conn ) {                            conn.disconnect();                        }                        conn = null;                        continue;                    } catch (IOException e) {                        if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                            transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime);                        }                                                // If we managed to get down here, it is really an error.                        // However, being disconnected from the server, for                        // whatever reason, is a common place event. No need to                        // clutter the screen with scary messages. When the                        // message layer believes it's serious, it prints the                        // scary message already.                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Failed to read message from " + senderURL, e);                        }                        // Time to call this connection dead.                        stop();                        break;                    } finally {                        try {                            inputStream.close();                        } catch( IOException ignored ) {                            ;                        }                    }                }            } catch (Throwable argh) {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error("Poller exiting because of uncaught exception", argh);                }                stop();            } finally {                pollerThread = null;            }                        if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("Message polling stopped for " + senderURL);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -