⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpconnection.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     *  Description of the Method     */    private void closeIOs() {        if (inputStream != null) {            try {                inputStream.close();                inputStream = null;            } catch (Exception ez1) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("could not close inputStream ", ez1);                }            }        }        if (outputStream != null) {            try {                outputStream.close();                outputStream = null;            } catch (Exception ez1) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Error : could not close outputStream ", ez1);                }            }        }        if (sharedSocket != null) {            try {                sharedSocket.close();                sharedSocket = null;            } catch (Exception ez1) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Error : could not close socket ", ez1);                }            }        }    }            /**     *  {@inheritDoc}     *     *@param  target  Description of the Parameter     *@return         Description of the Return Value     */    public boolean equals(Object target) {        if (this == target) {            return true;        }                if (null == target) {            return false;        }                if (target instanceof TcpConnection) {            TcpConnection likeMe = (TcpConnection) target;                        return getDestinationAddress().equals(likeMe.getDestinationAddress()) &&            getDestinationPeerID().equals(likeMe.getDestinationPeerID());        }                return false;    }            /**     *  {@inheritDoc}     */    protected void finalize() {        close();    }            /**     *  {@inheritDoc}     *     *@return    Description of the Return Value     */    public int hashCode() {        return getDestinationPeerID().hashCode() + getDestinationAddress().hashCode();    }            /**     *  This is called with "true" when the invoker is about to read some input     *  and is not willing to wait for it to come. This is called with "false"     *  when the invoker is about to wait for a long time for input to become     *  available with a potentialy very long blocking read.     *     *@param  active  Description of the Parameter     */    private void inputActive(boolean active) {        if (active) {            inputStream.setWatchList(ShortCycle);        } else {            inputStream.setWatchList(LongCycle);        }    }            /**     *  {@inheritDoc}     *     * <p/>This is the background Thread. While the connection is active, takes     * messages from the queue and send it.     */    public void run() {        try {            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("tcp receive - starts for " + inetAddress.getHostAddress() + ":" + port);            }                        try {                while (isConnected()) {                    if (closed) {                        break;                    }                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("tcp receive - message starts for " + inetAddress.getHostAddress() + ":" + port);                    }                                        // We can stay blocked here for a long time, it's ok.                    MessagePackageHeader header = new MessagePackageHeader(inputStream);                    MimeMediaType msgMime = header.getContentTypeHeader();                    long msglength = header.getContentLengthHeader();                                        // FIXME 20020730 bondolo@jxta.org Do something with content-coding here.                                        if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Message body (" + msglength + ") starts for " + inetAddress.getHostAddress() + ":" + port);                    }                                        // read the message!                    // We have received the header, so, the rest had better                    // come. Turn the short timeout on.                    inputActive(true);                                        Message msg = null;                    try {                        InputStream msgStream = new LimitInputStream(inputStream, msglength, true);                        msg = WireFormatMessageFactory.fromWire(msgStream, msgMime, (MimeMediaType) null);                    } catch (IOException failed) {                        if (LOG.isEnabledFor(Level.WARN)) {                            LOG.warn("Failed reading msg from " + inetAddress.getHostAddress() + ":" + port);                        }                                                throw failed;                    } finally {                        // We can relax again.                        inputActive(false);                    }                                        if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Handing incoming message from " + inetAddress.getHostAddress() + ":" + port + " to EndpointService");                    }                    try {                        // Demux the message for the upper layers                        if (listener != null) {                            listener.demux(msg);                        }                    } catch (Throwable t) {                        if (LOG.isEnabledFor(Level.WARN)) {                            LOG.warn("Failure while endpoint demuxing " + msg, t);                        }                    }                    setLastUsed(System.currentTimeMillis());                }            } catch (InterruptedIOException woken) {                // We have to treat this as fatal since we don't know where                // in the framing the input stream was at.                                closingDueToFailure = true;                                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Error : read() timeout after " + woken.bytesTransferred + " on connection " + inetAddress.getHostAddress() + ":" + port);                }            } catch (EOFException finished) {                // The other side has closed the connection                if (LOG.isEnabledFor(Level.INFO)) {                    LOG.info("Connection was closed by " + inetAddress.getHostAddress() + ":" + port);                }            } catch (Throwable e) {                closingDueToFailure = true;                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Error on connection " + inetAddress.getHostAddress() + ":" + port, e);                }            } finally {                synchronized( this ) {                    if (!closed) {                        // We need to close the connection down.                        close();                    }                                        recvThread = null;                }            }        } catch (Throwable all) {            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);            }        }    }            /**     *  Send message to the remote peer.     *     *@param  msg              the message to send.     *@exception  IOException  Description of the Exception     */    public void sendMessage(Message msg) throws IOException {                // socket is a stream, only one writer at a time...        synchronized (writeLock) {            if (closed) {                if (LOG.isEnabledFor(Level.INFO)) {                    LOG.info("Connection was closed to : " + dstAddress);                }                throw new IOException("Connection was closed to : " + dstAddress);            }                        boolean success = false;            long size = 0;                        try {                // 20020730 bondolo@jxta.org Do something with content-coding here                // serialize the message.                WireFormatMessage serialed = WireFormatMessageFactory.toWire(                msg,                appMsg, (MimeMediaType[]) null);                                // Build the protocol header                // Allocate a buffer to contain the message and the header                MessagePackageHeader header = new MessagePackageHeader();                header.setContentTypeHeader(serialed.getMimeType());                size = serialed.getByteLength();                header.setContentLengthHeader(size);                                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("sendMessage (" + serialed.getByteLength() + ") to " + dstAddress + " via " + inetAddress.getHostAddress() + ":" + port);                }                                // Write the header and the message.                header.sendToStream(outputStream);                outputStream.flush();                                serialed.sendToStream(outputStream);                outputStream.flush();                                // all done!                success = true;                setLastUsed(System.currentTimeMillis());            } catch (Throwable failure) {                if (LOG.isEnabledFor(Level.INFO)) {                    LOG.info("tcp send - message send failed for " + inetAddress.getHostAddress() + ":" + port, failure);                }                                closingDueToFailure = true;                close();            }        }    }            /**     *  Description of the Method     */    protected void start() {        recvThread.start();    }            /**     *  Description of the Method     *     *@exception  IOException  Description of the Exception     */    private void startSocket(PeerID id) throws IOException {                sharedSocket.setKeepAlive(true);        int useBufferSize = Math.max(SendBufferSize, sharedSocket.getSendBufferSize());        sharedSocket.setSendBufferSize(useBufferSize);        useBufferSize = Math.max(RecvBufferSize, sharedSocket.getReceiveBufferSize());        sharedSocket.setReceiveBufferSize(useBufferSize);                sharedSocket.setSoLinger(true, LingerDelay);        //                        socket.setTcpNoDelay(true);                outputStream = new WatchedOutputStream(sharedSocket.getOutputStream());        outputStream.setWatchList(ShortCycle);        inputStream = new WatchedInputStream(sharedSocket.getInputStream());        outputStream.setWatchList(LongCycle);                if ((inputStream == null) || (outputStream == null)) {            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("   failed getting streams.");            }            throw new IOException("Could not get streams");        }        myWelcome = new WelcomeMessage(fullDstAddress, fullDstAddress, id, false);        myWelcome.sendToStream(outputStream);        outputStream.flush();        // The response should arrive shortly or we bail out.        inputActive(true);        itsWelcome = new WelcomeMessage(inputStream);                // Ok, we can wait for messages now.        inputActive(false);        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Hello from " + itsWelcome.getPublicAddress() + " [" + itsWelcome.getPeerID() + "]");        }        recvThread = new Thread(this);        setThreadName();        recvThread.setDaemon(true);    }            /**     *  {@inheritDoc} <p/>     *     *  Implementation for debugging.     *     *@return    Description of the Return Value     */    public String toString() {        return super.toString() + ":" +        ((null != itsWelcome) ? itsWelcome.getPeerID().toString() : "unknown") +        " on address " +        ((null != dstAddress) ? dstAddress.toString() : "unknown");    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -