📄 jtlsinputstream.java
字号:
/************************************************************************
*
* $Id: JTlsInputStream.java,v 1.2 2002/03/04 21:42:58 echtcherbina Exp $
*
* Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
* must not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see
* <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*********************************************************************************/
package net.jxta.impl.endpoint.tls;
import COM.claymoresystems.ptls.SSLSocket;
import COM.claymoresystems.ptls.SSLDebug;
import net.jxta.endpoint.*;
import net.jxta.impl.endpoint.MessageImpl;
import net.jxta.impl.endpoint.MessageElementImpl;
import net.jxta.impl.endpoint.MessageWireFormat;
import net.jxta.impl.endpoint.MessageWireFormatBinary;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.util.Vector;
import java.util.Enumeration;
import org.apache.log4j.Category; import org.apache.log4j.Priority;
import java.lang.Thread;
public class JTlsInputStream extends InputStream {
private static final Category LOG = Category.getInstance(JTlsInputStream.class.getName());
TlsConn conn;
// PureTLS will do its raw reads off of this InputStream
// Here, we will have queued up the payload of TLS message
// elements to be passed to TLS code as TLS Records.
private boolean tlsInputDebug = false;
private JTlsRecord jtrec = null;
private MessageWireFormat wireIn = null; // for reading the jxta binary input
private int sequenceNumber = 0;
private int mrrSequenceNumber = 0;
private Vector inputElements = new Vector(10, 1);
private InputStream plain_in = null;
private InputStream netIn = null; // MUST BE FIXED for endpoint reading
static private int MAXQUEUESIZE = 25;
Vector inputQueue = new Vector(MAXQUEUESIZE); // For incoming messages.
private static final String ACKKEY = "ack=";
private static final String SACKKEY = "sack=";
public JTlsInputStream(TlsConn conn) {
this.conn = conn;
jtrec = new JTlsRecord();
wireIn = new MessageWireFormatBinary(JTlsDefs.MTYPE);
// 1 <= seq# <= maxint, monotonically increasing
// Incremented before compare.
sequenceNumber = 0;
// Most Recently Received Msg sequence number
mrrSequenceNumber = 0;
// must be explicity set after SSL Socket is opened
plain_in = null;
}
// protected accessor for sequence number
int getSequenceNumber() {
return sequenceNumber;
}
// Our input queue max size
int getMaxIQSize() {
return MAXQUEUESIZE;
}
// set plain text input
public void setPlaintextInputStream(SSLSocket s)
{
plain_in = s.getInputStream();
}
/**
* Code to read a binary message from plain text input stream
* Data has been decrypted.
*/
// Called from the TlsConn read thread.
public int readMessage(Message jmsg)
{
try {
wireIn.readMessage(plain_in, jmsg);
} catch (IOException iox) {
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("IOException in JTlsInputStream.readMessage:\n" +
" " + iox.getMessage());
return -1;
}
return 0;
}
// rcvdACK:
// See if this is an ACK
// name = jxtatls:ack=<sequence number>
private int rcvdACK(MessageElement elt)
{
String[] names = MessageElement.parseName(elt.getName());
String namespace = names[0];
String ackStr = names[1];
// look for ack=
int i = ackStr.indexOf(ACKKEY);
if (i != -1) {
Integer j = new Integer(ackStr.substring(i + 4));
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("ACK RCVD, SEQN = " + j.intValue());
return j.intValue();
} else {
return 0;
}
}
// trigger a retransmission if we have a hole in the received
// message element Q.
// WE MAY NEED A TIMER HERE to prevent too many
private int holeStart = 0,
holeEnd = 0;
private void triggerRetransmission(int seqnum)
{
// received element leaves a gap
// Note: seqnum is no longer in our inputQ
if (seqnum > mrrSequenceNumber + 1) {
// redundancy check
if (holeStart == mrrSequenceNumber && ((holeEnd + 1)== seqnum)) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TRIGGER ACK: Ignore duplicate hole [" + mrrSequenceNumber +
", " + seqnum + "]");
}
// Same hole: keep end in sequence
holeEnd = seqnum;
return;
} else {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TRIGGER ACK: Input hole [" + mrrSequenceNumber +
", " + seqnum + "]");
}
String sackList = SACKKEY + mrrSequenceNumber + "," + seqnum;
sendACK(mrrSequenceNumber, sackList);
// remember to prevent duplicates
holeStart = mrrSequenceNumber;
holeEnd = seqnum;
}
}
}
static final private byte[] ACKText = {
(byte)'T', (byte)'L', (byte)'S', (byte)'A', (byte)'C', (byte)'K'
};
// Send an ACK for received in sequence messages
// Names == jxtatls:ack=<seqnum>
//
// static final boolean ACKON = true;
//
// Do selective acknowledgment which is backwards
// compatible to ack max in sequence and queued message.
// Add another element, jxtatls:sack
private void sendACK(int oldSeqn)
{
// find maximum iQElt j, such that
// sequenceNumber + 1, ..., sequenceNumber + j
// are consecutive.
// Note: We always ack the most recently read message.
int ackSeqn = mrrSequenceNumber;
// Only ack if unacked msg in queue
// NOTE: We just removed a message from the queue
// that must be acked with sequenceNumber.
int nToAck = 0;
// All messages in input Queue:
// seq1,seq2,..,seqN
String selectedAckList;
if (oldSeqn != 0) {
// Old message arrived. ACK just in case
selectedAckList = SACKKEY + oldSeqn + "," + ackSeqn;
} else {
selectedAckList = SACKKEY + ackSeqn;
}
// See what is in out input queue
for (int j = 0; j < inputQueue.size(); j++) {
IQElt iQ = (IQElt)inputQueue.elementAt(j);
// add to sack list
selectedAckList += "," + iQ.seqnum;
if (iQ.seqnum == ackSeqn + 1) {
// Max consecutive seq number
ackSeqn = iQ.seqnum;
}
// See if we have ackd this one
if (!iQ.ackd) {
iQ.ackd = true;
nToAck =+ 1;
}
}
// PERMIT DUPLICATE ACKS. Just a list and one small message.
sendACK(ackSeqn, selectedAckList);
}
private void sendACK(int seqnum, String sackList)
{
String names = JTlsDefs.TLSNameSpace + ":" + ACKKEY + seqnum;
byte[] data = (byte[])(sackList == null ? ACKText : sackList.getBytes());
Message ACKMsg = new MessageImpl();
MessageElement elt = ACKMsg.newMessageElement(names,
null,
data,
0,
data.length);
ACKMsg.addElement(elt);
try {
conn.transport.sendToRemoteTls(conn.destAddr, ACKMsg);
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info ("SENT ACK, SEQN = " + seqnum);
if (sackList != null)
LOG.info (" SACK, SEQN = " + sackList);
}
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.INFO))
LOG.info ("sendACK caught IOException:",e);
}
}
// An input queue element which breaks out a
// received message in enqueueMessage().
private class IQElt {
int seqnum;
Message msg;
MessageElement elt;
boolean ackd;
}
// We have an iQElt:
//
// Two cases:
//
// o Expected sequence number
// We process the data
//
// o If the sequence number > the next expected
// sequence number, then this element is out of order
// and we queue it for latter access.
//
//
private MessageElement validateElement(IQElt iQ)
{
if (iQ.seqnum > sequenceNumber) {
// Out of order. Place elt into vector queue.
//PDA requirement 18.02.2002
// Vector.add -> Vector.addElement
// inputElements.add(iQ.elt);
inputElements.addElement(iQ.elt);
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("validateElement: queue elt: seq# = " + iQ.seqnum + " expected = " +
sequenceNumber);
}
// Could be an earlier packet was lost.
// Send an ACK to trigger retransmission
triggerRetransmission(iQ.seqnum);
return null;
} else if (iQ.seqnum < sequenceNumber) {
// duplicates should not make it but just in case
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("validateElement: duplicate, seq# = " + iQ.seqnum +
" expected = " + sequenceNumber);
}
return null;
}
// found expected element
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("validateElement: elt in sequence = " + iQ.seqnum);
}
return iQ.elt;
}
// Look for elt with expected sequence number
// in our inputElements vector
private MessageElement getVectorElement()
{
int size = inputElements.size();
for (int i = 0; i < size; ++i) {
MessageElement elt = (MessageElement)inputElements.elementAt(i);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -