⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 httpclientmessenger.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info("Stop polling for " + senderURL);            }                        stopped = true;                        // Here, we are forced to abandon this object open. Because we could            // get blocked forever trying to close it. It will rot away after            // the current read returns. The best we can do is interrupt the            // thread; unlikely to have an effect per the current.            // HttpURLConnection implementation.                        Thread stopPoller = pollerThread;                        if (null != stopPoller) {                stopPoller.interrupt();            }        }                /**         *  Returns {@code true} if this messenger is stopped otherwise          *  {@code false}.         *         *  @return returns {@code true} if this messenger is stopped otherwise          *  {@code false}.         */        protected boolean isStopped() {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine(this + " " + senderURL + " --> " + (stopped ? "stopped" : "running"));            }                        return stopped;        }                /**         *  {@inheritDoc}         *         *  <p/>Connects to the http server and waits for messages to be received and processes them.         */        public void run() {            try {                long beginConnectTime = 0;                long connectTime = 0;                long noReconnectBefore = 0;                HttpURLConnection conn = null;                                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                    LOG.info("Message polling beings for " + pollingURL);                }                                int connectAttempt = 1;                                // get messages until the messenger is closed                while (!isStopped()) {                    if (conn == null) {                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("Opening new connection to " + pollingURL);                        }                                                conn = (HttpURLConnection) pollingURL.openConnection(); // Incomming data channel                                                conn.setRequestMethod("GET");                        conn.setDoOutput(false);                        conn.setDoInput(true);                        conn.setAllowUserInteraction(false);                        conn.setUseCaches(false);                        conn.setConnectTimeout(CONNECT_TIMEOUT);                        conn.setReadTimeout(RESPONSE_TIMEOUT);                                                if (TransportMeterBuildSettings.TRANSPORT_METERING) {                            beginConnectTime = TimeUtils.timeNow();                        }                                                // Loop back and try again to connect                        continue;                    }                                        long untilNextConnect = TimeUtils.toRelativeTimeMillis(noReconnectBefore);                                        try {                        if (untilNextConnect > 0) {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Delaying for " + untilNextConnect + "ms before reconnect to " + senderURL);                            }                            Thread.sleep(untilNextConnect);                        }                    } catch (InterruptedException woken) {                        Thread.interrupted();                        continue;                    }                                        InputStream inputStream;                    MimeMediaType messageType;                                        try {                        if (connectAttempt > 1) {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Reconnect attempt for " + senderURL);                            }                        }                                                // Always connect (no cost if connected).                        conn.connect();                                                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("Waiting for response code from " + senderURL);                        }                                                int responseCode = conn.getResponseCode();                                                if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) {                            LOG.finer(                                    "Response " + responseCode + " for Connection : " + senderURL + "\n\tContent-Type : "                                    + conn.getHeaderField("Content-Type") + "\tContent-Length : "                                    + conn.getHeaderField("Content-Length") + "\tTransfer-Encoding : "                                    + conn.getHeaderField("Transfer-Encoding"));                        }                                                connectTime = TimeUtils.timeNow();                        noReconnectBefore = TimeUtils.toAbsoluteTimeMillis(MIMIMUM_POLL_INTERVAL, connectTime);                                                if (0 == conn.getContentLength()) {                            continue;                        }                                                if (HttpURLConnection.HTTP_NO_CONTENT == responseCode) {                            // the connection timed out.                            if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                                transportBindingMeter.connectionClosed(true, TimeUtils.toRelativeTimeMillis(beginConnectTime, connectTime));                            }                                                        conn = null;                            continue;                        }                                                if (responseCode != HttpURLConnection.HTTP_OK) {                            if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                                transportBindingMeter.connectionClosed(true, TimeUtils.timeNow() - beginConnectTime);                            }                                                        throw new IOException("HTTP Failure: " + conn.getResponseCode() + " : " + conn.getResponseMessage());                        }                                                String contentType = conn.getHeaderField("Content-Type");                        if (null == contentType) {                            // XXX 20051219 bondolo Figure out why the mime type is not always set.                            messageType = EndpointServiceImpl.DEFAULT_MESSAGE_TYPE;                        } else {                            messageType = MimeMediaType.valueOf(contentType);                        }                                                // FIXME 20040907 bondolo Should get message content-encoding from http header.                                                inputStream = conn.getInputStream();                                                // reset connection attempt.                        connectAttempt = 1;                    } catch (InterruptedIOException broken) {                        // We don't know where it was interrupted. Restart connection.                        Thread.interrupted();                                                if (connectAttempt > CONNECT_RETRIES) {                            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                                LOG.warning("Unable to connect to " + senderURL);                            }                                                        stop();                            break;                        } else {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Failed connecting to " + senderURL);                            }                                                        if (null != conn) {                                conn.disconnect();                            }                            conn = null;                            connectAttempt++;                            continue;                        }                    } catch (IOException ioe) {                        if (connectAttempt > CONNECT_RETRIES) {                            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                                LOG.log(Level.WARNING, "Unable to connect to " + senderURL, ioe);                            }                                                        stop();                            break;                        } else {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Failed connecting to " + senderURL);                            }                                                        if (null != conn) {                                conn.disconnect();                            }                            conn = null;                            connectAttempt++;                            continue;                        }                    }                                        // start receiving messages                    try {                        while (!isStopped()                                && (TimeUtils.toRelativeTimeMillis(TimeUtils.timeNow(), connectTime) < RESPONSE_TIMEOUT)) {                            // read a message!                            long messageReceiveStart = TimeUtils.timeNow();                            Message incomingMsg;                                                        incomingMsg = WireFormatMessageFactory.fromWire(inputStream, messageType, null);                                                        if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                                transportBindingMeter.messageReceived(true, incomingMsg, incomingMsg.getByteLength(),                                        TimeUtils.timeNow() - messageReceiveStart);                            }                                                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Received " + incomingMsg + " from " + senderURL);                            }                                                        servletHttpTransport.executor.execute(new MessageProcessor(incomingMsg));                                                        // note that we received a message                            lastUsed = TimeUtils.timeNow();                        }                        if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                            transportBindingMeter.connectionClosed(true, TimeUtils.timeNow() - beginConnectTime);                        }                    } catch (EOFException e) {                        // Connection ran out of messages. let it go.                        conn = null;                    } catch (InterruptedIOException broken) {                        if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                            transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime);                        }                                                // We don't know where it was interrupted. Restart connection.                        Thread.interrupted();                        if (null != conn) {                            conn.disconnect();                        }                        conn = null;                    } catch (IOException e) {                        if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                            transportBindingMeter.connectionDropped(true, TimeUtils.timeNow() - beginConnectTime);                        }                                                // If we managed to get down here, it is really an error.                        // However, being disconnected from the server, for                        // whatever reason, is a common place event. No need to                        // clutter the screen with scary messages. When the                        // message layer believes it's serious, it prints the                        // scary message already.                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.log(Level.FINE, "Failed to read message from " + senderURL, e);                        }                        // Time to call this connection dead.                        stop();                        break;                    } finally {                        try {                            inputStream.close();                        } catch (IOException ignored) {                            //ignored                        }                    }                }            } catch (Throwable argh) {                if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                    LOG.log(Level.SEVERE, "Poller exiting because of uncaught exception", argh);                }                stop();            } finally {                pollerThread = null;            }                        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info("Message polling stopped for " + senderURL);            }        }    }    /**     * A small class for processing individual messages.      */     private class MessageProcessor implements Runnable {                private Message msg;                MessageProcessor(Message msg) {            this.msg = msg;        }                public void run() {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Demuxing " + msg + " from " + senderURL);            }                        servletHttpTransport.getEndpointService().demux(msg);        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -