📄 jtlsinputstream.java
字号:
int seqN = JTlsUtil.getSequenceNumber(elt);
if (seqN == sequenceNumber) {
// remove element from vector
inputElements.removeElementAt(i);
return elt;
}
}
return null;
}
// We are called from TlsManager.processIncomingMessage(..);
synchronized public void queueIncomingMessage(Message msg)
{
// Get message sequence number and note if an ACK
// We can process acks because we do not queue them.
int i = 0;
int ackSeqn = 0;
int msgSeqn = 0;
Enumeration e = ((MessageImpl) msg).getElementsInFifoOrder();
MessageElement elt = null;
while (e.hasMoreElements()) {
elt = (MessageElement)e.nextElement();
ackSeqn = rcvdACK(elt);
// See if we have an ACK
if (ackSeqn != 0) {
// See if this is a SACK
byte[] data = elt.getBytesOffset();
String sackStr = new String(data);
String sackList = null;
if (sackStr.indexOf(SACKKEY) != -1) {
// Extract SAC List
sackList = new String(data, SACKKEY.length(),
data.length - SACKKEY.length());
}
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("SACKLIST = " + sackList);
}
// take care of the ACK here;
conn.jout.ackReceived(ackSeqn, sackList);
msg = null;
// wake up reader
notifyAll();
return;
}
// OK look for jxta message
try {
msgSeqn = JTlsUtil.getSequenceNumber(elt);
if (msgSeqn != 0) {
// see if this is a duplicate
if (msgSeqn < mrrSequenceNumber) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("RCVD DUPLICATE MSG: Discard seq# " + msgSeqn);
}
// force discard.
msgSeqn = -1;
}
break;
}
} catch (NumberFormatException n) {
// Not a TLS element
continue;
}
}
// Make sure it is for us
if (msgSeqn <= 0) {
if (LOG.isEnabledFor(Priority.INFO) && msgSeqn == 0) {
LOG.info("TLS!! Received DUPLICATE or NON-TLS msg");
}
// jump start GC
msg = null;
// run other threads
notifyAll();
// Do not enqueue
return;
}
// OK we must inqueue:
// Wait until someone dequeues if we are at the size limit
while (inputQueue.size() >= MAXQUEUESIZE) {
try {
if (LOG.isEnabledFor(Priority.INFO) && (i++ % 10) == 0) {
LOG.info("NotifyAll(): enqueue WAIT, size = " + inputQueue.size());
}
// Wake up dequeuers
notifyAll();
wait();
} catch (InterruptedException iex) {
// just continue
}
}
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("QUEUE incoming message:");
}
// This is a FIFO - Vector is synchronized
// 1. Do not add duplicate messages
// 2. Store in increasing sequence nos.
i = inputQueue.size();
int qIndex = -1;
boolean duplicate = false;
for (int j = 0; j < i; j++) {
IQElt iq = (IQElt)inputQueue.elementAt(j);
if (msgSeqn < iq.seqnum) {
qIndex = j;
break;
} else if (msgSeqn == iq.seqnum) {
duplicate = true;
break;
}
}
// Keep non-duplicates
// Note: sequenceNumber + 1 is our next expected
// sequence number.
if (!duplicate && msgSeqn > mrrSequenceNumber) {
IQElt iq = new IQElt();
iq.seqnum = msgSeqn;
iq.msg = msg;
iq.elt = elt;
iq.ackd = false;
// append to input queue if qIndex == -1
if (qIndex == -1) qIndex = inputQueue.size();
//PDA requirement 18.02.2002
// Vector.add -> Vector.insertElementAt
// inputQueue.add(qIndex, iq);
inputQueue.insertElementAt(iq, qIndex);
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("Enqueued msg with seq# " + msgSeqn +
" At index " + qIndex);
}
} else {
// just toss
msg = null;
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TLS!! Discarded duplicate msg, seq# = " + msgSeqn);
}
}
// Let dequeuers read
notifyAll();
if (LOG.isEnabledFor(Priority.INFO) &&
SSLDebug.getDebug(SSLDebug.DEBUG_JXTA)) {
LOG.info("NotifyAll(): N TLS Records queued = " + inputQueue.size());
}
}
// Dequeue is done in local_read(..) just below.
// Called from synchronized local_read(..)
private IQElt dequeueMessage()
{
IQElt iQ = null;
// Wait for incoming message here
int wct = 0;
while (inputQueue.size() == 0) {
try {
wct += 1;
// Wake up enqueuer thread
notifyAll();
wait();
} catch (InterruptedException e) {
// just continue
}
}
if (inputQueue.size() > 0) {
iQ = (IQElt)inputQueue.elementAt(0); // FIFO
inputQueue.removeElementAt(0);
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("DEQUEUED a TLS Record to process, seq# = " + iQ.seqnum);
LOG.info("DEQUEUE waited " + wct + " times on empty input queue");
}
}
// Wake up enqueue thread(s)
notifyAll();
return iQ;
}
// 1) If JTlsRecord is empty, then
// read raw JXTA MSG element into the record.
// 2) Return requested bytes to our callers.
private MessageElement currentElt = null;
synchronized private int local_read(byte[] a, int offset, int length)
throws IOException
{
if (jtrec.size == 0 || jtrec.nextByte == jtrec.size) {
// reset the record
jtrec.resetRecord(); // GC as necessary(tlsRecord byte[])
sequenceNumber += 1; // next msg sequence number
// Does our vector contain the required element with
// this sequenceNumber?
currentElt = getVectorElement();
// no, try and dequeue another message
if (currentElt == null) {
// Get the next element
try {
// Place the next message on our InputStream in
// jmsg.
while (currentElt == null) {
IQElt iQ = null;
// read another message
// !! Here we will just GET the next message.
if ((iQ = dequeueMessage()) == null) {
continue;
} else {
// Return elt with expected sequence number
// or null. If null, read elements have been added to our
// inputElements queue.
currentElt = validateElement(iQ);
// Free all elements from jmsg. They have been queued
// in the inputElements Vector as noted above.
JTlsUtil.removeElements(iQ.msg);
}
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("TLSInputStream exception: " + e);
return -1;
}
}
// Send an ACK for the message that contained this TLS Record.
mrrSequenceNumber = sequenceNumber;
sendACK(0);
// Get the length of the TLS Record
jtrec.size = currentElt.getLength();
if (tlsInputDebug) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("local_read: seq#" + sequenceNumber +
", bytes = " + jtrec.size + "\n");
}
}
// get byte array
jtrec.tlsRecord = currentElt.getBytesOffset();
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("Rcvd TLS Record[" + jtrec.size + " bytes]" +
" Seq# = " + sequenceNumber);
// hex dump first 5 records
if (sequenceNumber < 6) {
String hex = JTlsUtil.toHex(jtrec.tlsRecord, 0, jtrec.size);
LOG.info("HexDump:\n" + hex);
}
}
}
// return the requested TLS Record data
// These calls should NEVER ask for more data than is in the
// received TLS Record.
int left = jtrec.size - jtrec.nextByte;
int copyLen = (left < length ? left : length);
System.arraycopy(jtrec.tlsRecord, jtrec.nextByte, a, offset, copyLen);
jtrec.nextByte += copyLen;
if (tlsInputDebug) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("\nRequested " + length + ", Read " + copyLen + " bytes");
}
}
return copyLen;
}
// Here we read the TLS Record data from the incoming JXTA message.
// (We will really have a full jxta message available.)
//
// TLS Record input only calls the following methods.
// They are called from SSLRecord.decode(SSLConn, Inputstream);
//
/*
* Read a byte from the TLS Record which was received in
* a JXTA Message.
*
* returns -1 on end of stream
*/
public int read()
throws java.io.IOException
{
byte[] a = new byte[1];
int i = local_read(a, 0, 1);
if (i != -1) {
if (LOG.isEnabledFor(Priority.INFO) &&
SSLDebug.getDebug(SSLDebug.DEBUG_JXTA)) {
LOG.info("Read() " + i);
}
i = (int)(a[0] & 0xFF); // The byte
}
return i;
}
// Give requested bytes to TLS.
public int read(byte[] a, int offset, int length)
throws java.io.IOException {
int i = local_read(a, offset, length);
if (LOG.isEnabledFor(Priority.INFO) &&
SSLDebug.getDebug(SSLDebug.DEBUG_JXTA)) {
LOG.info("Read(byte[], int, " + length + "), bytes read = " + i);
}
return i;
}
// Read into a byte array
public int read(byte[] a)
throws java.io.IOException {
int i = local_read(a, 0, a.length);
return i;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -