📄 jtlsoutputstream.java
字号:
/* * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.endpoint.tls;import java.io.OutputStream;import java.io.IOException;import java.net.*;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Vector;import java.util.logging.Level;import net.jxta.logging.Logging;import java.util.logging.Logger;import net.jxta.endpoint.ByteArrayMessageElement;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.StringMessageElement;import net.jxta.impl.endpoint.tls.TlsConn.HandshakeState;import net.jxta.impl.util.TimeUtils;/** * Acts as the output for TLS. Accepts ciphertext from TLS and packages it into * messages for sending to the remote. The messages are kept in a retry queue * until the remote peer acknowledges receipt of the message. **/class JTlsOutputStream extends OutputStream { /** * Log4J Logger **/ private static final Logger LOG = Logger.getLogger(JTlsOutputStream.class.getName()); // constants /** * This maximum is only enforced if we have not heard * from the remote for RETRMAXAGE. **/ private static final int MAXRETRQSIZE = 100; /** * Initial estimated Round Trip Time **/ private static final long initRTT = 1 * TimeUtils.ASECOND; private static final MessageElement RETELT = new StringMessageElement(JTlsDefs.RETR, "TLSRET", null); /** * Retrans window. When reached, we up the RTO. **/ private static final int RWINDOW = 5; /** * If true then the stream has been closed. **/ private volatile boolean closed = false; /** * If true then the stream is being closed. * It means that it still works completely for all messages already * queued, but no new message may be enqueued. **/ private volatile boolean closing = false; /** * Sequence number of the message we most recently sent out. **/ private volatile int sequenceNumber = 0; /** * Sequence number of highest sequential ACK. **/ private volatile int maxACK = 0; /** * Transport we are working for **/ private TlsTransport tp = null; /** * connection we are working for **/ private TlsConn conn = null; private Retransmitter retransmitter = null; // for retransmission /** * Average round trip time in milliseconds. **/ private volatile long aveRTT = initRTT; /** * Number of ACK message received. **/ private int nACKS = 0; /** * Retry Time Out measured in milliseconds. **/ private volatile long RTO = 0; /** * Minimum Retry Timeout measured in milliseconds. **/ private volatile long minRTO = initRTT; /** * Maximum Retry Timeout measured in milliseconds. **/ private volatile long maxRTO = initRTT * 5; /** * absolute time in milliseconds of last sequential ACK. **/ private volatile long lastACKTime = 0; /** * absolute time in milliseconds of last SACK based retransmit. **/ private volatile long sackRetransTime = 0; /** * The collection of messages available for re-transmission. */ final List<RetrQElt> retrQ = new Vector<RetrQElt>(25, 5); // running average of receipients Input Queue private int nIQTests = 0; private int aveIQSize = 0; /** * Our estimation of the current free space in the remote input queue. **/ private volatile int mrrIQFreeSpace = 0; /** * Our estimation of the maximum sise of the remote input queue. **/ private int rmaxQSize = 0; /** * retrans queue element **/ private static class RetrQElt { int seqnum; // sequence number of this message. long enqueuedAt; // absolute time of original enqueing. volatile Message msg; // the message int marked; // has been marked as retransmission long sentAt; // when this msg was last transmitted public RetrQElt(int seqnum, Message msg) { this.seqnum = seqnum; this.msg = msg; this.enqueuedAt = TimeUtils.timeNow(); this.sentAt = this.enqueuedAt; this.marked = 0; } } JTlsOutputStream(TlsTransport tp, TlsConn conn) { this.conn = conn; // TlsConnection. this.tp = tp; // our transport this.RTO = minRTO; // initial RTO // input free queue size this.rmaxQSize = 20; this.mrrIQFreeSpace = rmaxQSize; // Init last ACK Time to now this.lastACKTime = TimeUtils.timeNow(); this.sackRetransTime = TimeUtils.timeNow(); // Start retransmission thread this.retransmitter = new Retransmitter(); } /** * {@inheritDoc} * * <p/>We don't current support linger. **/ @Override public void close() throws IOException { synchronized (this) { super.close(); closed = true; } synchronized (retrQ) { retrQ.notifyAll(); retrQ.clear(); } } /** * indicate that we're in the process of closing. To respect the semantics * of close()/isClosed(), we do not set the closed flag, yet. Instead, we * set the flag "closing", which simply garantees that no new message * will be queued. * This, in combination with getSequenceNumber and getMaxAck, and * waitQevent, enables fine grain control of the tear down process. **/ public void setClosing() { synchronized (retrQ) { closing = true; retrQ.clear(); retrQ.notifyAll(); } } /** * {@inheritDoc} **/ @Override public void write(int c) throws IOException { byte[] a = new byte[1]; a[0] = (byte) (c & 0xFF); write(a, 0, 1); } /** * {@inheritDoc} * * <p/>We override the write(byte[], offset, length); * method which is called by SSLRecord.send(SSLConn conn) * via tos.writeTo(conn.sock_out), tos a ByteArrayOutputStream * which has buffered the TLS output record in the byte array. * The actual call is write(byte[] b, 0, length); * * <p/>We put this TLS record into a msssage element for the output * pipe to send along. * * <p/>This is reasonable since in fact, if more than 16K bytes of * application data are sent, then the max TLS Record is a little * larger than 16K bytes including the TLS overhead. * * <p/>Therefore, an app. message is N+r TLS Records, * Message length = Nx16K + r, N >= 0, r >= 0, * N > 0 || r > 0 true. **/ @Override public void write(byte[] b, int off, int len) throws IOException { // flag to allow connection closure in finally block // Connection can not be closed when holding a lock on this boolean closeStale = false; // allocate new message Message jmsg = new Message(); try { if (closed) { throw new IOException("stream is closed"); } if (closing) { throw new IOException("stream is being closed"); } if (b == null) { throw new IllegalArgumentException("buffer is null"); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -