📄 jtlsoutputstream.java
字号:
// inserted = true;
// break;
// }
try {
if (i_seqnum.intValue() < ((Integer)v.elementAt(i)).intValue()) {
v.insertElementAt(i_seqnum, i);
inserted = true;
break;
}
} catch (NumberFormatException e) {
}
// end of changed code
}
if (!inserted)
//PDA requirement 18.02.2002
// Vector.add -> Vector.addElement
// v.add(i_seqnum);
v.addElement(i_seqnum);
}
return v;
}
private int callAVEIQ(int iq)
{
int n = nIQTests;
nIQTests += 1;
aveIQSize = ((n * aveIQSize) + iq)/nIQTests;
return aveIQSize;
}
// ack received:
// We only acknowledge messages received in sequence.
//
// The seqnum is for the largest unacknowledged seqnum
// the receipient has received.
//
// The sackList is a sequence of all of the received
// messages in the sender's input Q.
//
// Recepients are passive and only ack upon the receipt
// of an in sequence message.
//
// They depend on our RTO to fill holes in message
// sequences.
synchronized void ackReceived(int seqnum, String sackList)
{
lastACKTime = System.currentTimeMillis();
int n = 0;
// remove acknowledged messages from retrans Q.
int i = retrQ.size();
// We need to calculate how much free queue space the
// remote has (used also in SAC code below)
Vector sacs = null;
int rqLen = 1; // We don't know
if (sackList != null) {
// get a sorted vector of Integers
sacs = sortSackList(sackList);
rqLen = sacs.size();
}
// most recent remote IQ free space
mrrIQFreeSpace = rmaxQSize - rqLen;
if (i == 0) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TLS!! RETRANS Q EMPTY");
}
return;
}
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TLS!! RETRQ(SIZE = " + i + "):");
for (int y = 0; y < i; y++) {
RetrQElt r = (RetrQElt)retrQ.elementAt(y);
LOG.info("r(" + y + ")=" + r.seqnum);
}
}
long enqueuetime = 0;
// First monotonically increasing seq#s in vector
while (retrQ.size() > 0) {
RetrQElt r = (RetrQElt)retrQ.elementAt(0);
if (r.seqnum <= seqnum) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TLS!! SEQUENTIALLY ACKD SEQN = " + r.seqnum);
}
// Acknowledged
//PDA requirement 18.02.2002
// Vector.remove -> Vector.removeElementAt
// retrQ.remove(0);
retrQ.removeElementAt(0);
JTlsUtil.removeElements(r.msg);
// for aveRTT calculation
enqueuetime = r.enqueuedAt;
// Update RTT, RTO
if (enqueuetime != 0) {
calcRTT(enqueuetime);
}
r.msg = null;
r = null;
n += 1;
} else {
// End of sequentially acknowledgments
break;
}
}
if (sackList != null) {
// run through retrans Q removing all members that are on the
// sackList (also monotonically increasing - we hope)
int j = 0, l = 0;
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("SACK Vector(" + sackList + "):");
for (int z = 0; z < sacs.size(); z++) {
LOG.info("v(" + z + ")=" + (Integer)sacs.elementAt(z));
}
}
int sacnum = 0;
while (l < sacs.size() && retrQ.size() > 0) {
// next ackd seqnum
sacnum = ((Integer)sacs.elementAt(l++)).intValue();
boolean ackedMsg = false;
// OK. Find the Q member to remove
for (int k = j; k < retrQ.size(); k++) {
RetrQElt r = (RetrQElt)retrQ.elementAt(k);
if (r.seqnum == sacnum) {
// Acknowledged
//PDA requirement 18.02.2002
// Vector.remove -> Vector.removeElementAt
// retrQ.remove(k);
retrQ.removeElementAt(k);
// get the next sequence number
// (vector is left shifted)
j = k;
// ack counter
n += 1;
// for aveRTT calculation
enqueuetime = r.enqueuedAt;
// Update RTT, RTO
if (enqueuetime != 0) {
calcRTT(enqueuetime);
}
// GC this stuff
JTlsUtil.removeElements(r.msg);
r.msg = null;
r = null;
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("TLS!! SACKD SEQN = " + sacnum);
ackedMsg = true;
break;
}
}
// Log any duplicate acknowledgements
if (!ackedMsg) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TLS!! Duplicate SACK = " + sacnum);
}
}
}
// let's look at average sacs.size(). If it is big, then this
// probably means we must back off because the system is slow.
// Our retrans Queue can be large and we can overwhelm the
// receiver with retransmissions.
// We will keep the rwin <= ave real input queue size.
int remoteIQ = sacs.size() ;
int aveIQ = callAVEIQ(remoteIQ);
// Retransmit? Only if there is a hole in the selected
// acknowledgement list. Otherwise let RTO deal.
// Given that this SACK acknowledged messages still
// in the retrQ:
// seqnum is the max consectively SACKD message.
// seqnum < sacnum means a message has not reached
// receiver. EG: sacklist == 10,11,13 seqnum == 11
// We retransmit 12.
if (retrQ.size() > 0) {
// get the seqnum of the first element
RetrQElt r = (RetrQElt)retrQ.elementAt(0);
// Assure there is a hole &&
// and it can be filled by the retrans. Q.
if (seqnum < sacnum && r.seqnum < sacnum) {
// determine hole size.
// all remaining elements on retrQ were
// not acked. We resend those in the window
// seqnum < n < sacnum
int ackwin = 1; // at least 1
int upper = (RWINDOW < retrQ.size() ? RWINDOW : retrQ.size());
for (i = 1; i < upper; i++) {
r = (RetrQElt)retrQ.elementAt(i);
if (r.seqnum >= sacnum) break;
else
ackwin += 1;
}
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("RETR: Fill hole, SACK, Seqnum = " + seqnum +
", Max seqnum = " + sacnum + ", Window =" + ackwin);
}
// retransmit 1 retq mem. only
retransmit(ackwin);
sackRetransTime = System.currentTimeMillis();
}
}
}
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TLS!! N ACKED = " + n);
}
}
static final private byte[] MARKText = {
(byte)'T', (byte)'L', (byte)'S', (byte)'R', (byte)'E', (byte)'T'
};
// mark a message as a retransmission
private void markRetransmit(Message msg)
{
MessageElement elt = msg.newMessageElement(JTlsDefs.MARKRETR,
null,
MARKText,
0,
MARKText.length);
msg.addElement(elt);
}
// retransmit unacknowledged messages
// The upper limit is rwin.
synchronized int retransmit(int rwin)
{
int i = (retrQ.size() < rwin ? retrQ.size() : rwin);
int i0 = i; // don't ask (-:
if (i > 0 && LOG.isEnabledFor(Priority.INFO)) {
LOG.info("TLS!! RETRANSMITING [rwindow = " + i0 + "]:");
}
int n = 0;
for (int j = 0; j < i; j++) {
RetrQElt r = (RetrQElt)retrQ.elementAt(j);
try {
// Mark message as retransmission
if (!r.marked) {
markRetransmit(r.msg);
r.marked = true;
}
// Make a copy to for sending
Message cmsg = (Message)r.msg.clone();
tp.sendToRemoteTls(this.destAddr, cmsg);
n += 1;
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("TLS!! RETRANSMIT SEQN = " + r.seqnum);
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("TLS!! IOErr while RETRANS SEQN = " + r.seqnum);
}
}
return n;
}
// A received message can force RTO to be reset through receive
// ACKS for retransmitted data
void triggerRetransmission()
{
long realWait = System.currentTimeMillis() - lastACKTime;
// Trigger restransmission on reasonably long RTO
// IE, receipt of a messge is a fake ACK.
if (realWait >= (RTO + minRTO)) retransmit(1);
}
// Retransmission thread
private class Retransmitter implements Runnable {
Thread th;
int nretransmitted;
int nAtThisRTO = 0;
public Retransmitter() {
this.nretransmitted = 0;
this.th = new Thread(this, "TLS Retransmit thread");
th.setDaemon(true);
th.start();
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("TLS!! STARTED TLS Retransmit thread, RTO = " + RTO);
}
public int getRetransCount() {
return nretransmitted;
}
public void run() {
int idleCounter = 0;
sackRetransTime = System.currentTimeMillis();
while (true) {
try {
Thread.currentThread().sleep(RTO);
} catch (InterruptedException e) {
;
}
// see if we recently did a retransmit triggered by a SACK
long lastSackRetr = System.currentTimeMillis() - sackRetransTime;
if (lastSackRetr < RTO) {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("RETR: No retrans, sack retrans " + lastSackRetr + " ms ago");
}
continue;
}
// See how long we've waited since RTO was set
long realWait = System.currentTimeMillis() - lastACKTime;
long oldestWait = 0;
if (retrQ.size() > 0) {
oldestWait = System.currentTimeMillis() -
(long)((RetrQElt)(retrQ.elementAt(0))).enqueuedAt;
}
// get real wait as max of age of oldest in retrQ and
// lastAck time
realWait = (realWait < oldestWait? oldestWait : realWait);
// Retransmit only if RTO has expired.
// a. real wait time is longer than RTO
// b. oldest message on Q has been there longer
// than RTO. This is necessary because we may
// have just sent a message, and we do not
// want to overrun the receiver. Also, we
// do not want to restransmit a message that
// has not been idle for the RTO.
if (realWait >= RTO && oldestWait >= RTO) {
// retrasmit
int i = retransmit(RWINDOW);
// Total
nretransmitted += i;
// number at this RTO
nAtThisRTO += (i == 0 ? 0 : i);
// See if real wait is too long and queue is non-empty
// Remote may be dead - double until max.
// Double after window restransmitted msgs at this RTO
// exceeds the RWINDOW, and we've had no response for
// twice the current RTO.
if (i > 0 && realWait >= 2*RTO && nAtThisRTO >= 2*RWINDOW) {
RTO = (realWait > maxRTO ? maxRTO : 2*RTO);
nAtThisRTO = 0;
}
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("TLS!! RETR: RTO(" + nAtThisRTO + ") = " + RTO + " N RETRANS = " +
i);
} else {
idleCounter += 1;
// reset RTO to min if we are idle
if (idleCounter == 2) {
RTO = minRTO;
idleCounter = 0;
}
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("TLS!! RETR IDLE: RTO = " + RTO + " WAIT = " + realWait);
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -