⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliableoutputstream.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        this.RTO = maxRTO;                this.mrrIQFreeSpace = rmaxQSize;        this.rttThreshold = rmaxQSize;                // Init last ACK Time to now        this.lastACKTime = TimeUtils.timeNow();        this.sackRetransTime = TimeUtils.timeNow();                // Attach the flowControl module        this.fc = fc;                // Update our initial rwindow to reflect fc's initial value        this.rwindow = fc.getRwindow();    }        /**     * {@inheritDoc}     */    @Override    public void close() throws IOException {        flush();                super.close();        localClosed = true;        closedAt = TimeUtils.toRelativeTimeMillis(lingerDelay);                synchronized (retrQ) {            retrQ.notifyAll();        }                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {            LOG.info("Closed.");        }    }        public long getLingerDelay() {        return lingerDelay;    }        public void setLingerDelay(long linger) {        if (linger < 0) {            throw new IllegalArgumentException("Linger delay may not be negative.");        }                if (0 == linger) {            linger = Long.MAX_VALUE;        }                lingerDelay = linger;    }        /**     * Return the size of the buffers we are using for accumulating writes.     *     * @return size of our write buffers.     */    public int setSendBufferSize() {        return writeBufferSize;    }        /**     * Set the size of the buffers we will use for accumulating writes.     *     * @param size The desired size of write buffers.     * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.     */    public void setSendBufferSize(int size) throws IOException {        if (size <= 0) {            throw new IllegalArgumentException("Send buffer size may not be <= 0");        }                // Flush any existing buffered writes. Then next write will use the new buffer size.        synchronized (writeLock) {            flushBuffer();            writeBufferSize = size;        }    }        /**     * We have received a close request from the remote peer. We must stop     * retransmissions immediately.     */    public void hardClose() {        remoteClosed = true;        closedAt = TimeUtils.timeNow();                // Clear the retry queue. Remote side doesn't care.        synchronized (retrQ) {            retrQ.clear();            retrQ.notifyAll();        }                // Clear the write queue. Remote side doesn't care.        synchronized (writeLock) {            writeCount = 0;            writeBuffer = null;            writeBufferAge = Long.MAX_VALUE;        }                if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {            LOG.info("Hard closed.");        }    }        /**     * Returns the state of the stream     *     * @return true if closed     */    public boolean isClosed() {        return localClosed || remoteClosed;    }        /**     * {@inheritDoc}     */    @Override    public void flush() throws IOException {        synchronized (writeLock) {            flushBuffer();        }    }        /**     * {@inheritDoc}     */    @Override    public void write(int b) throws IOException {        write(new byte[] { (byte) b }, 0, 1);    }        /**     * {@inheritDoc}     */    @Override    public void write(byte b[], int off, int len) throws IOException {        synchronized (writeLock) {            if (isClosed()) {                throw new IOException("stream is closed");            }                        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {                throw new IndexOutOfBoundsException();            }                        if (len == 0) {                return;            }                        int current = off;            int end = current + len;                        while (current < end) {                if (0 == writeCount) {                    // No bytes written? We need a new buffer.                    writeBuffer = new byte[writeBufferSize];                    writeBufferAge = TimeUtils.timeNow();                }                                int remain = end - current;                                int available = writeBuffer.length - writeCount;                int copy = Math.min(available, remain);                                System.arraycopy(b, current, writeBuffer, writeCount, copy);                writeCount += copy;                current += copy;                                if (writeBuffer.length == writeCount) {                    flushBuffer();                }            }        }    }        /**     * Flush the internal buffer. {@code writeLock} must have been previously     * acquired.     * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.     */    private void flushBuffer() throws IOException {        if (writeCount > 0) {            // send the message            try {                writeBuffer(writeBuffer, 0, writeCount);            } finally {                writeCount = 0;                writeBuffer = null;                writeBufferAge = Long.MAX_VALUE;            }        }    }        /**     * Write the internal buffer. {@code writeLock} must have been previously     * acquired.     *     * @param b data     * @param off  the start offset in the data.     * @param len     the number of bytes to write.     * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed.     */    private void writeBuffer(byte[] b, int off, int len) throws IOException {        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {            throw new IndexOutOfBoundsException();        }                if (len == 0) {            return;        }                if (null == retrThread) {            retrThread = new Thread(new Retransmitter(), "JXTA Reliable Retransmiter for " + this);            retrThread.setDaemon(true);            retrThread.start();        }                // allocate new message        Message jmsg = new Message();        synchronized (retrQ) {            while (true) {                if (isClosed()) {                    throw new IOException("Connection is " + (localClosed ? "closing" : "closed"));                }                if (retrQ.size() > Math.min(rwindow, mrrIQFreeSpace * 2)) {                    try {                        retrQ.wait(1000);                    } catch (InterruptedException ignored) {// ignored                    }                    continue;                }                break;            }                        int sequenceToUse = sequenceNumber.incrementAndGet();            MessageElement element = new ByteArrayMessageElement(Integer.toString(sequenceToUse), Defs.MIME_TYPE_BLOCK, b, off                    ,                    len, null);            jmsg.addMessageElement(Defs.NAMESPACE, element);            RetrQElt retrQel = new RetrQElt(sequenceToUse, jmsg.clone());                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Reliable WRITE : seqn#" + sequenceNumber + " length=" + len);            }                        // place copy on retransmission queue            retrQ.add(retrQel);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Retrans Enqueue added seqn#" + sequenceNumber + " retrQ.size()=" + retrQ.size());            }        }                outgoing.send(jmsg);        mrrIQFreeSpace--;        // assume we have now taken a slot        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("SENT : seqn#" + sequenceNumber + " length=" + len);        }    }        /**     * Serialize a JXTA message as a reliable message.     *     * <p/>This method bypasses the built-in buffering and ignores the MTU size.     *     * @param msg message to send     * @return message sequence number     * @throws IOException if an I/O error occurs     */    public int send(Message msg) throws IOException {        WireFormatMessage msgSerialized = WireFormatMessageFactory.toWire(msg, Defs.MIME_TYPE_MSG, null);        ByteArrayOutputStream baos = new ByteArrayOutputStream((int) msgSerialized.getByteLength());        msgSerialized.sendToStream(baos);        baos.close();        byte[] bytes = baos.toByteArray();                synchronized (writeLock) {            flushBuffer();            writeBuffer(bytes, 0, bytes.length);            return sequenceNumber.get();        }    }        /**     * Gets the maxAck attribute of the ReliableOutputStream object     *     * @return The maxAck value     */    public int getMaxAck() {        return maxACK;    }        /**     * Gets the seqNumber attribute of the ReliableOutputStream object     *     * @return The seqNumber value     */    public int getSeqNumber() {        return sequenceNumber.get();    }        /**     * Gets the queueFull attribute of the ReliableOutputStream object     *     * @return The queueFull value     */    protected boolean isQueueFull() {        return mrrIQFreeSpace < 1;    }        /**     * Gets the queueEmpty attribute of the ReliableOutputStream object.     *     * @return {@code true} if the queue is empty otherwise {@code false}.     */    public boolean isQueueEmpty() {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -