⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcpconnection.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        sharedSocket.setSoLinger(true, TcpTransport.LingerDelay);        sharedSocket.setTcpNoDelay(true);        woutputStream = new WatchedOutputStream(sharedSocket.getOutputStream(), TcpTransport.ChunkSize);        woutputStream.setWatchList(proto.ShortCycle);        winputStream = new WatchedInputStream(sharedSocket.getInputStream(), TcpTransport.ChunkSize);        winputStream.setWatchList(proto.LongCycle);        if ((winputStream == null) || (woutputStream == null)) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("   failed getting streams.");            }            throw new IOException("Could not get streams");        }        outputStream = new BufferedOutputStream(woutputStream, TcpTransport.ChunkSize);        inputStream = winputStream;        myWelcome = new WelcomeMessage(fullDstAddress, proto.getPublicAddress(), proto.group.getPeerID(), false);        myWelcome.sendToStream(outputStream);        outputStream.flush();        // The response should arrive shortly or we bail out.        inputActive(true);        itsWelcome = new WelcomeMessage(inputStream);        // Ok, we can wait for messages now.        inputActive(false);        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("startSocket : Hello from " + itsWelcome.getPublicAddress() + " [" + itsWelcome.getPeerID() + "]");        }        recvThread = new Thread(proto.myThreadGroup, this);        setThreadName();        recvThread.setDaemon(true);    }    protected void start() {        recvThread.start();    }    /**     * Send message to the remote peer.     *     *  @param msg  the message to send.     *  @return If <tt>true</tt> the message was sent successfully otherwise <tt>false</tt>.     */    public boolean sendMessage(Message msg) throws IOException {        // socket is a stream, only one writer at a time...        synchronized (writeLock) {            if (closed) {                if (LOG.isEnabledFor(Level.INFO)) {                    LOG.info("Connection was closed to : " + dstAddress);                }                throw new IOException("Connection was closed to : " + dstAddress);            }            boolean success = false;            long sendBeginTime = 0;            long size = 0;            if (TransportMeterBuildSettings.TRANSPORT_METERING) {                sendBeginTime = System.currentTimeMillis();            }            try {                // 20020730 bondolo@jxta.org Do something with content-coding here                // serialize the message.                WireFormatMessage serialed = WireFormatMessageFactory.toWire(msg, appMsg, (MimeMediaType[]) null);                // Build the protocol header                // Allocate a buffer to contain the message and the header                MessagePackageHeader header = new MessagePackageHeader();                header.setContentTypeHeader(serialed.getMimeType());                size = serialed.getByteLength();                header.setContentLengthHeader(size);                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Sending " + msg + " (" + serialed.getByteLength() + ") to " + dstAddress + " via " + inetAddress.getHostAddress() + ":" + port);                }                header.sendToStream(outputStream);                serialed.sendToStream(outputStream);                outputStream.flush();                // all done!                success = true;                setLastUsed(System.currentTimeMillis());                if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                    transportBindingMeter.messageSent(initiator, msg, System.currentTimeMillis() - sendBeginTime, size);                }                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Sent " + msg + " successfully via " + inetAddress.getHostAddress() + ":" + port);                }                return true;            } catch (Exception failed) {                if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                    transportBindingMeter.sendFailure(initiator, msg, System.currentTimeMillis() - sendBeginTime, size);                }                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Message send failed for " + inetAddress.getHostAddress() + ":" + port, failed);                }                closingDueToFailure = true;                close();                IOException failure = new IOException("Failed sending " + msg + " to : " + inetAddress.getHostAddress() + ":" + port);                failure.initCause(failed);                throw failure;            }        }    }    /**     *  {@inheritDoc}     *     * This is the background Thread. While the connection is active, takes     * messages from the queue and send it.     */    public void run() {        long receiveBeginTime = 0;        long size = 0;        try {            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("Starting receiver for " + inetAddress.getHostAddress() + ":" + port);            }            try {                while (isConnected()) {                    if (closed) {                        break;                    }                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Message receive starts for " + inetAddress.getHostAddress() + ":" + port);                    }                    // We can stay blocked here for a long time, it's ok.                    MessagePackageHeader header = new MessagePackageHeader(inputStream);                    if (TransportMeterBuildSettings.TRANSPORT_METERING) {                        receiveBeginTime = System.currentTimeMillis();                    }                    MimeMediaType msgMime = header.getContentTypeHeader();                    long msglength = header.getContentLengthHeader();                    // FIXME 20020730 bondolo@jxta.org Do something with content-coding here.                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("tcp receive - message body (" + msglength + ") starts for " + inetAddress.getHostAddress() + ":" + port);                    }                    // read the message!                    // We have received the header, so, the rest had better                    // come. Turn the short timeout on.                    inputActive(true);                    Message msg = null;                    try {                        msg = WireFormatMessageFactory.fromWire(new LimitInputStream(inputStream, msglength, true), msgMime, (MimeMediaType) null);                    } catch (IOException failed) {                        if (LOG.isEnabledFor(Level.INFO)) {                            LOG.info("tcp receive - failed reading msg from " + inetAddress.getHostAddress() + ":" + port);                            // LOG.error(sharedSocket.toString() +                            // "\tbound " + sharedSocket.isBound() +                            // "\tclosed " + sharedSocket.isClosed() +                            // "\tconntected " + sharedSocket.isConnected() +                            // "\tisInputShutDown " + sharedSocket.isInputShutdown());                        }                        throw failed;                    } finally {                        // We can relax again.                        inputActive(false);                    }                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Handing " + msg + " from " + inetAddress.getHostAddress() + ":" + port + " to EndpointService");                    }                    if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                        transportBindingMeter.messageReceived(initiator, msg, System.currentTimeMillis() - receiveBeginTime, msglength);                    }                    // Demux the message for the upper layers.                    proto.endpoint.demux(msg);                    setLastUsed(System.currentTimeMillis());                }            } catch (InterruptedIOException woken) {                if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                    transportBindingMeter.receiveFailure(initiator, System.currentTimeMillis() - receiveBeginTime, size);                }                // We have to treat this as fatal since we don't know where                // in the framing the input stream was at. This should have                // been handled below.                closingDueToFailure = true;                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn(                        "tcp receive - Error : read() timeout after " + woken.bytesTransferred + " on connection " + inetAddress.getHostAddress()                        + ":" + port);                }            } catch (EOFException finished) {                if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                    transportBindingMeter.receiveFailure(initiator, System.currentTimeMillis() - receiveBeginTime, size);                }                // The other side has closed the connection                if (LOG.isEnabledFor(Level.INFO)) {                    LOG.info("tcp receive - Connection was closed by " + inetAddress.getHostAddress() + ":" + port);                }            } catch (SocketException finished) {                if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                    transportBindingMeter.receiveFailure(initiator, System.currentTimeMillis() - receiveBeginTime, size);                }                closingDueToFailure = true;                // The other side has closed the connection                if (LOG.isEnabledFor(Level.INFO)) {                    LOG.info("tcp receive - Connection was closed by " + inetAddress.getHostAddress() + ":" + port);                }            } catch (Throwable e) {                if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                    transportBindingMeter.receiveFailure(initiator, System.currentTimeMillis() - receiveBeginTime, size);                }                closingDueToFailure = true;                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("tcp receive - Error on connection " + inetAddress.getHostAddress() + ":" + port, e);                }            } finally {                if (!closed) {                    // We need to close the connection down.                    close();                }            }        } catch (Throwable all) {            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("Uncaught Throwable in thread :" + Thread.currentThread().getName(), all);            }        } finally {            recvThread = null;        }    }    private void closeIOs() {        if (inputStream != null) {            try {                inputStream.close();                inputStream = null;            } catch (Exception ez1) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("could not close inputStream ", ez1);                }            }        }        if (outputStream != null) {            try {                outputStream.close();                outputStream = null;            } catch (Exception ez1) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Error : could not close outputStream ", ez1);                }            }        }        if (sharedSocket != null) {            try {                sharedSocket.close();                sharedSocket = null;            } catch (Exception ez1) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Error : could not close socket ", ez1);                }            }        }    }    /**     *  Soft close of the connection. Messages can no longer be sent, but any     *  in the queue will be flushed.     */    public synchronized void close() {        if (LOG.isEnabledFor(Level.INFO)) {            LOG.info( (closingDueToFailure ? "Failure" : "Normal") +                     " close (open " + TimeUtils.toRelativeTimeMillis( TimeUtils.timeNow(), firstUsed ) +                    "ms) of socket to : " + dstAddress + " / " + (inetAddress != null ? inetAddress.getHostAddress() : "UNKNOWN" )+ ":"                    + port);            if (LOG.isEnabledFor(Level.DEBUG) && closingDueToFailure) {                LOG.debug("stack trace", new Throwable("stack trace"));            }        }        if (!closed) {            setLastUsed(0); // we idle now. Way idle.            closeIOs();            closed = true;            Thread temp = recvThread;            if (temp != null) {                temp.interrupt();            }            if (TransportMeterBuildSettings.TRANSPORT_METERING && (transportBindingMeter != null)) {                if (closingDueToFailure) {                    transportBindingMeter.connectionDropped(initiator, System.currentTimeMillis() - connectionBegunTime);                } else {                    transportBindingMeter.connectionClosed(initiator, System.currentTimeMillis() - connectionBegunTime);                }            }            // socket closing happens in the shutdown of recvThread        }    }    /**     *  return the current connection status.     *     *  @param true if there is an active connection to the remote peer,     *  otherwise false.     *     */    public boolean isConnected() {        return ((recvThread != null) && (!closed));    }    /**     *  Return the absolute time in milliseconds at which this Connection was last used.     *     *  @return absolute time in milliseconds.     */    public long getLastUsed() {        return lastUsed;    }    /**     *  Set the last used time for this connection in absolute milliseconds.     *     *  @param time absolute time in milliseconds.     */    private void setLastUsed(long time) {        lastUsed = time;    }    TransportBindingMeter getTransportBindingMeter() {        return transportBindingMeter;    }    /**     * This is called with "true" when the invoker is about to read some     * input and is not willing to wait for it to come.     * This is called with "false" when the invoker is about to wait for     * a long time for input to become available with a potentialy very long     * blocking read.     */    private void inputActive(boolean active) {        if (active) {            winputStream.setWatchList(proto.ShortCycle);        } else {            winputStream.setWatchList(proto.LongCycle);        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -