📄 reliableoutputstream.java
字号:
this.RTO = maxRTO; this.mrrIQFreeSpace = rmaxQSize; this.rttThreshold = rmaxQSize; // Init last ACK Time to now this.lastACKTime = TimeUtils.timeNow(); this.sackRetransTime = TimeUtils.timeNow(); // Attach the flowControl module this.fc = fc; // Update our initial rwindow to reflect fc's initial value this.rwindow = fc.getRwindow(); } /** * {@inheritDoc} */ @Override public void close() throws IOException { flush(); super.close(); localClosed = true; closedAt = TimeUtils.toRelativeTimeMillis(lingerDelay); synchronized (retrQ) { retrQ.notifyAll(); } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Closed."); } } public long getLingerDelay() { return lingerDelay; } public void setLingerDelay(long linger) { if (linger < 0) { throw new IllegalArgumentException("Linger delay may not be negative."); } if (0 == linger) { linger = Long.MAX_VALUE; } lingerDelay = linger; } /** * Return the size of the buffers we are using for accumulating writes. * * @return size of our write buffers. */ public int setSendBufferSize() { return writeBufferSize; } /** * Set the size of the buffers we will use for accumulating writes. * * @param size The desired size of write buffers. * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed. */ public void setSendBufferSize(int size) throws IOException { if (size <= 0) { throw new IllegalArgumentException("Send buffer size may not be <= 0"); } // Flush any existing buffered writes. Then next write will use the new buffer size. synchronized (writeLock) { flushBuffer(); writeBufferSize = size; } } /** * We have received a close request from the remote peer. We must stop * retransmissions immediately. */ public void hardClose() { remoteClosed = true; closedAt = TimeUtils.timeNow(); // Clear the retry queue. Remote side doesn't care. synchronized (retrQ) { retrQ.clear(); retrQ.notifyAll(); } // Clear the write queue. Remote side doesn't care. synchronized (writeLock) { writeCount = 0; writeBuffer = null; writeBufferAge = Long.MAX_VALUE; } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("Hard closed."); } } /** * Returns the state of the stream * * @return true if closed */ public boolean isClosed() { return localClosed || remoteClosed; } /** * {@inheritDoc} */ @Override public void flush() throws IOException { synchronized (writeLock) { flushBuffer(); } } /** * {@inheritDoc} */ @Override public void write(int b) throws IOException { write(new byte[] { (byte) b }, 0, 1); } /** * {@inheritDoc} */ @Override public void write(byte b[], int off, int len) throws IOException { synchronized (writeLock) { if (isClosed()) { throw new IOException("stream is closed"); } if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } if (len == 0) { return; } int current = off; int end = current + len; while (current < end) { if (0 == writeCount) { // No bytes written? We need a new buffer. writeBuffer = new byte[writeBufferSize]; writeBufferAge = TimeUtils.timeNow(); } int remain = end - current; int available = writeBuffer.length - writeCount; int copy = Math.min(available, remain); System.arraycopy(b, current, writeBuffer, writeCount, copy); writeCount += copy; current += copy; if (writeBuffer.length == writeCount) { flushBuffer(); } } } } /** * Flush the internal buffer. {@code writeLock} must have been previously * acquired. * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed. */ private void flushBuffer() throws IOException { if (writeCount > 0) { // send the message try { writeBuffer(writeBuffer, 0, writeCount); } finally { writeCount = 0; writeBuffer = null; writeBufferAge = Long.MAX_VALUE; } } } /** * Write the internal buffer. {@code writeLock} must have been previously * acquired. * * @param b data * @param off the start offset in the data. * @param len the number of bytes to write. * @throws IOException if an I/O error occurs. In particular, an IOException is thrown if the output stream is closed. */ private void writeBuffer(byte[] b, int off, int len) throws IOException { if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } if (len == 0) { return; } if (null == retrThread) { retrThread = new Thread(new Retransmitter(), "JXTA Reliable Retransmiter for " + this); retrThread.setDaemon(true); retrThread.start(); } // allocate new message Message jmsg = new Message(); synchronized (retrQ) { while (true) { if (isClosed()) { throw new IOException("Connection is " + (localClosed ? "closing" : "closed")); } if (retrQ.size() > Math.min(rwindow, mrrIQFreeSpace * 2)) { try { retrQ.wait(1000); } catch (InterruptedException ignored) {// ignored } continue; } break; } int sequenceToUse = sequenceNumber.incrementAndGet(); MessageElement element = new ByteArrayMessageElement(Integer.toString(sequenceToUse), Defs.MIME_TYPE_BLOCK, b, off , len, null); jmsg.addMessageElement(Defs.NAMESPACE, element); RetrQElt retrQel = new RetrQElt(sequenceToUse, jmsg.clone()); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Reliable WRITE : seqn#" + sequenceNumber + " length=" + len); } // place copy on retransmission queue retrQ.add(retrQel); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Retrans Enqueue added seqn#" + sequenceNumber + " retrQ.size()=" + retrQ.size()); } } outgoing.send(jmsg); mrrIQFreeSpace--; // assume we have now taken a slot if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("SENT : seqn#" + sequenceNumber + " length=" + len); } } /** * Serialize a JXTA message as a reliable message. * * <p/>This method bypasses the built-in buffering and ignores the MTU size. * * @param msg message to send * @return message sequence number * @throws IOException if an I/O error occurs */ public int send(Message msg) throws IOException { WireFormatMessage msgSerialized = WireFormatMessageFactory.toWire(msg, Defs.MIME_TYPE_MSG, null); ByteArrayOutputStream baos = new ByteArrayOutputStream((int) msgSerialized.getByteLength()); msgSerialized.sendToStream(baos); baos.close(); byte[] bytes = baos.toByteArray(); synchronized (writeLock) { flushBuffer(); writeBuffer(bytes, 0, bytes.length); return sequenceNumber.get(); } } /** * Gets the maxAck attribute of the ReliableOutputStream object * * @return The maxAck value */ public int getMaxAck() { return maxACK; } /** * Gets the seqNumber attribute of the ReliableOutputStream object * * @return The seqNumber value */ public int getSeqNumber() { return sequenceNumber.get(); } /** * Gets the queueFull attribute of the ReliableOutputStream object * * @return The queueFull value */ protected boolean isQueueFull() { return mrrIQFreeSpace < 1; } /** * Gets the queueEmpty attribute of the ReliableOutputStream object. * * @return {@code true} if the queue is empty otherwise {@code false}. */ public boolean isQueueEmpty() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -