📄 jtlsoutputstream.java
字号:
/************************************************************************
*
* $Id: JTlsOutputStream.java,v 1.2 2002/03/04 21:42:58 echtcherbina Exp $
*
* Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
* must not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see
* <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*********************************************************************************/
package net.jxta.impl.endpoint.tls;
import COM.claymoresystems.ptls.SSLSocket;
import COM.claymoresystems.ptls.SSLDebug;
import net.jxta.endpoint.*;
import net.jxta.impl.endpoint.MessageImpl;
import net.jxta.impl.endpoint.MessageElementImpl;
import net.jxta.impl.endpoint.MessageWireFormat;
import net.jxta.impl.endpoint.MessageWireFormatBinary;
import org.apache.log4j.Priority;
import org.apache.log4j.Category;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.util.Vector;
import java.net.*;
public class JTlsOutputStream extends OutputStream {
private static final Category LOG = Category.getInstance(JTlsOutputStream.class.getName());
private MessageElement sentMelt = null;
private MessageWireFormat wFout = new MessageWireFormatBinary(JTlsDefs.MTYPE);
private int sequenceNumber = 0;
private EndpointAddress destAddr = null;
private TlsTransport tp = null;
private TlsConn conn = null;
// for retransmission
private long aveRTT = 0;
private int nACKS = 0;
long RTO = 0, minRTO = 0, maxRTO = 0;
private long lastACKTime = 0;
// constants
private static final long initRTT = 1000;
private static final long ONEMINUTE = 60000;
private static final int RETRMAXAGE = 10; // minutes
// THis maximum is only enforced if we have not heard
// from the remote for RETRMAXAGE.
private static final int MAXRETRQSIZE = 100;
// running average of receipients Input Queue
private int nIQTests = 0;
private int aveIQSize = 0;
private int mrrIQFreeSpace = 0;
private int rmaxQSize = 0;
// Retrans window. When reached, we up the RTO.
private static final int RWINDOW = 5;
Retransmitter retrThread = null;
long sackRetransTime = 0;
static final boolean testRetrans = false; // compile time debug flag
public JTlsOutputStream(TlsConn conn, TlsTransport tp, EndpointAddress destAddr) {
setupJxtaMsgs(); // for jxta message
this.conn = conn; // TlsConnection.
this.tp = tp; // our transport
this.destAddr = destAddr; // remote TLS endpoint address
this.aveRTT = initRTT; // 1 second initially
this.minRTO = initRTT; // 1 second
this.RTO = this.minRTO; // the same
this.maxRTO = 5*minRTO; // 5 seconds
// input free queue size
this.mrrIQFreeSpace = conn.jin.getMaxIQSize();
this.rmaxQSize = this.mrrIQFreeSpace;
// Init last ACK Time to now
this.lastACKTime = System.currentTimeMillis();
// Start retransmission thread
this.retrThread = new Retransmitter();
}
private void setupJxtaMsgs()
{
wFout = // for plaintext to tls
new MessageWireFormatBinary(JTlsDefs.MTYPE);
sequenceNumber = 1;
sentMelt = null;
}
// We write our plaintext to this stream
// Note: See SSLHandshake.handshake(). It is there that
// after the handshake
// SSLConn.sock_out_external = new SSLOutputStream(conn);
// plaintext_out is an instance of the above.
private OutputStream plaintext_out = null;
public void setPlaintextOutputStream(SSLSocket s)
{
plaintext_out = s.getOutputStream();
}
public OutputStream getPlaintextOutputStream()
{
return plaintext_out;
}
// Write a binary message to TLS for encrption:
// We get its length and set the output size to
// enable buffering of elements.
//
// Again note: plaintext_out writes to SSLOutputStream
// which in turn decrypts the data resulting in a call
// to write(byte[], offset, length) below.
//
// When this call returns the entire message has been writen
// by the call to write(..).
static final int BOSIZE = 16000;
public synchronized void writeMessage(Message wmsg)
throws IOException
{
BufferedOutputStream bo = new BufferedOutputStream(plaintext_out, BOSIZE);
wFout.writeMessage(bo, wmsg);
bo.flush();
}
// SSLOutputStream sends a series of TLS Records which are
// the encryption of the message. Thus, AppOutputSize is
// an accurate upper bound for total encrypted bytes sent
// for the message, and always an UNDER ESTIMATE of the total
// bytes being sent because we do not know the TLS overhead in
// advance.
// Here we override OutputStream's methods.
// These are called AFTER TLS has encrypted to send
// TLS records.
public void write(int c)
throws IOException
{
byte[] a = new byte[1];
a[0] = (byte)(c & 0xFF);
write(a, 0, 1);
}
// We override the write(byte[], offset, length);
// method which is called by SSLRecord.send(SSLConn conn)
// via tos.writeTo(conn.sock_out), tos a ByteArrayOutputStream
// which has buffered the TLS output record in the byte array.
// The actual call is write(byte[] b, 0, length);
//
// We put this TLS record into a msssage element for the output
// pipe to send along.
//
// With the new ACK scheme we now enforce One TLS Record/message.
//
// This is reasonable since in fact, if more than 16K bytes of
// application data are sent, then the max TLS Record is a little
// larger than 16K bytes including the TLS overhead.
//
// Therefore, an app. message is N+r TLS Records,
// Message length = Nx16K + r, N >= 0, r >= 0,
// N > 0 || r > 0 true.
synchronized public void write(byte[] b, int offset, int length)
throws IOException
{
// Copy the data since it will be queued, and caller may
// overwrite the same byte[] buffer.
byte[] data = new byte[length];
System.arraycopy(b, offset, data, 0, length);
// Name == jxtatls:<sequence number>
String names = JTlsDefs.TLSNameSpace + ":" + sequenceNumber;
// allocate new message
Message jmsg = new MessageImpl();
// add to our outgoing message elements
sentMelt = jmsg.newMessageElement(names,
null,
data,
0,
length);
// add TLS record as element
jmsg.addElement(sentMelt);
// (1) See if the most recent remote input queue size is close to
// it's maximum input queue size
// Send only if at least 20% or more of the queue is free.
// (2) Also, if our retransQ is larger than the remotes inputQ,
// wait until we've received an ack.
// We assume some msgs are in transit or the remote system buffers
// We do not want to overrun the receiver.
// (3) We need to release from the loop because of possible deadlocks
// EG: retrQ.size() == 0 and mrrIQFreeSpace forces looping
// forever because the most recent SACK cleared it, and the receiver
// is waiting for more data.
// max of 200ms wait
int maxwait = (int)(aveRTT < 200 ? aveRTT : 200);
int waitCt = maxwait/60; // iterations to wait (max 3)
if (waitCt < 1) waitCt = 1; // min 1
int i = 0;
while (mrrIQFreeSpace < rmaxQSize/5 || retrQ.size() > rmaxQSize) {
// see if max. wait has arrived.
if (i++ == waitCt) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("write() wait for ACK, maxwait timer expired");
}
break;
}
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("write() wait for ACK, remote IQ size = " + mrrIQFreeSpace);
LOG.info(" MIN to continue = " + rmaxQSize/5);
LOG.info(" retQ.size() = " + retrQ.size());
}
// Less than 20% free queue space is left. Wait.
try {
Thread.currentThread().sleep(60);
} catch (InterruptedException ex) {;}
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("write() woke up, remote IQ size = " + mrrIQFreeSpace);
}
}
// place copy on retransmission queue
retransEnqueue(sequenceNumber, (Message)jmsg.clone());
// Here we will send the message to the transport
tp.sendToRemoteTls(this.destAddr, jmsg);
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TLS!! TLS Record sent, len = " + length + " Seq# = " +
sequenceNumber);
// hex dump first 5 records
if (sequenceNumber < 6) {
String hex = JTlsUtil.toHex(b, offset, length);
LOG.info("HexDump:\n" + hex);
}
}
// next sequence number
sequenceNumber += 1;
}
// retrans queue element
private class RetrQElt {
int seqnum;
long enqueuedAt;
Message msg;
boolean marked;
public RetrQElt(int seqnum, Message msg)
{
this.seqnum = seqnum;
this.msg = msg;
this.enqueuedAt = System.currentTimeMillis();
this.marked = false;
}
}
Vector retrQ = new Vector(10, 1);
// place in retransmission queue
synchronized private void retransEnqueue(int seqN, Message msg)
{
RetrQElt r = new RetrQElt(seqN, msg);
//PDA requirement 19.02.2002
// Vector.add -> Vector.addElement
// retrQ.add(r);
retrQ.addElement(r);
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("TLS!! retrans Enqueue, size = " + retrQ.size() +
", added seq#" + seqN);
}
// get the lifetime of the first entry in minutes
private int retrQfirstEntryAgeMins()
{
if (retrQ.size() == 0) return 0; // empty
RetrQElt r = (RetrQElt)retrQ.elementAt(0);
long msage = System.currentTimeMillis() - r.enqueuedAt;
int minage = (int)(msage/ONEMINUTE);
return minage;
}
private void calcRTT(long enqueuedAt)
{
long dt = System.currentTimeMillis() - enqueuedAt;
if (dt == 0) dt += 1;
int n = nACKS;
nACKS += 1;
aveRTT = ((n * aveRTT) + dt)/(nACKS);
// Set retransmission time out: 2.5 x RTT
RTO = (aveRTT << 1) + (aveRTT >> 1);
// Inforce a min/max
if (RTO < minRTO) {
RTO = minRTO;
} else if (RTO > maxRTO) {
RTO = maxRTO;
}
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("TLS!! RTT = " + dt + " aveRTT = " + aveRTT + "ms" +
" RTO = " + RTO);
}
private Vector sortSackList(String slist)
{
Vector v = new Vector(1, 1);
while (slist != null) {
Integer i_seqnum = new Integer(0);
int i1 = slist.indexOf(",");
if (i1 == -1) {
// last sequence number
i_seqnum = new Integer(slist);
slist = null;
} else {
// extract integer
String s = slist.substring(0, i1);
i_seqnum = new Integer(s);
// "iJ,iJ+1,...,iN" <- "iJ+1, ...,iN"
slist = slist.substring(i1+1);
}
// sort vector
boolean inserted = false;
for (int i = 0; i < v.size(); ++i) {
//PDA requirement 18.02.2002
// inteface Comparable did not exist in jdk 1.1.8
// if (i_seqnum.compareTo((Integer)v.elementAt(i)) < 0) {
// v.insertElementAt(i_seqnum, i);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -