📄 reliablepipeservice.java
字号:
sendMsg (ACK, highestInSeq, incomingQueue.getFreeSlots ());
}
switch (state) {
case ESTABLISHING:
switch (type) {
case SYN:
LOG.debug("WATCH: Received SYN, Sending SYN_ACK");
// tell the other peer what my first sequence number is
outgoingSeq = new Random ().nextLong ();
sendMsg (SYN_ACK, outgoingSeq++, 1);
break;
case SYN_ACK:
LOG.debug("WATCH: Received SYN_ACK");
Object pendingRetransmit =
pendingRetransmits.get (new Long (synSeq));
if (pendingRetransmit != null) {
// cancel the pending retransmit
LOG.debug("Canceling retransmit for " + synSeq);
schedulerService.cancelAction (pendingRetransmit);
pendingRetransmits.remove (new Long (synSeq));
} else {
LOG.debug("No retransmit scheduled for " + synSeq);
}
lastAckedSeq = highestInSeq = seq;
state = CONNECTED;
incomingQueue.setNextExpected(seq + 1);
outgoingWindow = window;
// a thread is blocked in establish(), so tell it
// we've opened the connection
synchronized (lock) {
lock.notifyAll ();
}
break;
}
break;
case CONNECTED:
switch (type) {
case MSG:
LOG.debug("WATCH: putting msg " + seq + " on incoming queue *****");
incomingQueue.put (msg, seq);
notifyListeners();
break;
case ACK:
LOG.debug("WATCH: got ACK " + seq);
Object pendingRetransmit = pendingRetransmits.get (new Long (seq));
if (pendingRetransmit != null) {
// cancel the pending retransmit
LOG.debug("Canceling retransmit for " + seq);
schedulerService.cancelAction (pendingRetransmit);
pendingRetransmits.remove (new Long (seq));
LOG.debug("WATCH: seq = " + seq +
" lastAckedSeq = " + lastAckedSeq);
//outgoingQueue.slide (seq - lastAckedSeq);
lastAckedSeq = seq;
} else {
LOG.debug("No retransmit scheduled for " + seq);
}
outgoingWindow = window;
// note the window size the peer wants
break;
case FIN1:
// peer at other end wants to close
// tell sender to send everything we have, then quit
state = CLOSE_WAIT;
break;
}
case CLOSE_WAIT:
switch (type) {
case FIN2:
// other peer flushed all his buffers to us
state = CLOSED;
break;
}
}
}
void sendMsg (int type, long seq, int window) throws IOException {
Message msg = bidirPipeService.pipeService.createMessage ();
setHeader (msg, type, seq, window);
outputPipe.send (msg);
if (type != ACK && type != SYN_ACK) {
Object pendingRetransmit =
schedulerService.scheduleAction (new RetransmitAction(msg),
RETRANSMIT_TIMEOUT);
pendingRetransmits.put (new Long(seq), pendingRetransmit);
}
}
void sendMsg (Message msg, long seq) throws IOException {
setHeader (msg, MSG, seq, 0);
outputPipe.send (msg);
Object pendingRetransmit =
schedulerService.scheduleAction (new RetransmitAction (msg),
RETRANSMIT_TIMEOUT);
pendingRetransmits.put (new Long(seq), pendingRetransmit);
}
void setHeader (Message msg, int type, long seq, int window)
throws IOException
{
ByteArrayOutputStream out = new ByteArrayOutputStream (12);
DataOutputStream dout = new DataOutputStream (out);
dout.writeLong (seq);
dout.writeInt (type);
dout.writeInt (window);
MessageElement msgElem =
msg.newMessageElement(RELIABLE_HEADER,
null,
out.toByteArray ());
msg.addElement(msgElem);
}
class RetransmitAction implements SchedulerService.Action {
Message msg;
int retransmits;
RetransmitAction (Message msg) {
this.msg = msg;
}
public void perform (SchedulerService ss) {
long seq = 0;
try {
DataInputStream header =
new DataInputStream (
msg.getElement(RELIABLE_HEADER).getStream());
seq = header.readLong ();
int type = header.readInt ();
int window = header.readInt ();
LOG.debug("Retransmitting " + seq + " type " + type);
outputPipe.send (msg);
} catch (IOException e) {
// nothing to do, will quit when retransmit count maxes out
}
if (retransmits < MAX_RETRANSMITS) {
Object pendingRetransmit =
ss.scheduleAction (this, RETRANSMIT_TIMEOUT);
pendingRetransmits.put (new Long(seq), pendingRetransmit);
retransmits++;
} else {
// perhaps the connection has been dropped by the other
// peer
// is the following right, though? should we wait?
state = TIMED_OUT;
synchronized (lock) {
lock.notifyAll ();
}
}
}
}
}
}
class IncomingMessageQueue extends PriorityQueue {
private final static Category LOG =
Category.getInstance(IncomingMessageQueue.class);
int capacity;
int total;
long nextExpected;
IncomingMessageQueue (int size) {
initialize (size, 2);
this.capacity = size;
}
protected boolean lessThan (Object o1, Object o2) {
return ((IncomingMessage) o1).seq < ((IncomingMessage) o2).seq;
}
public synchronized void put (Message msg, long seq) {
put (new IncomingMessage (msg, seq));
total++;
notifyAll ();
}
public synchronized void setNextExpected(long nextExpected) {
this.nextExpected = nextExpected;
}
public synchronized long popUntilBreakInSequence (long lastSeq) {
IncomingMessage msg;
while (top () != null) {
msg = (IncomingMessage) top ();
// okay we got the top()
if (lastSeq + 1 == msg.seq) {
// if it's one more than the previous, pop it off.
lastSeq = msg.seq;
pop ();
total--;
} else {
break;
}
}
return lastSeq;
}
/**
* Wait until the next in-sequence message is pushed,
* then return it.
*/
/*
public synchronized Message pop (long maxTimeout)
throws InterruptedException {
IncomingMessage im;
Timer t = new Timer();
t.start();
long timeout;
while (true) {
while (top () == null) {
if(maxTimeout == -1) {
wait();
} else {
timeout = maxTimeout - t.elapsed();
if (timeout > 0) {
wait (timeout);
} else {
//return null; // should break out of outside loop
break;
}
}
}
im = (IncomingMessage) top ();
if (im == null) {
return null;
} else if (im.seq == nextExpected) {
LOG.debug("im.seq = " + im.seq + " nextExpected = "
+ nextExpected);
pop ();
total--;
nextExpected = im.seq + 1;
return im.msg;
} else {
LOG.debug("im.seq = " + im.seq + " nextExpected = "
+ nextExpected);
continue;
}
}
}
*/
public synchronized Message popNext() {
IncomingMessage im;
im = (IncomingMessage)top();
if (im == null) {
return null;
}
// get rid of all the ones less than expected.
// they are duplicates due to retransmissions
while (im != null && im.seq < nextExpected) {
LOG.debug("WATCH: less than expected, nextExpected = " +
nextExpected + " seq = " + im.seq);
pop();
total--;
im = (IncomingMessage)top();
}
if (im != null && im.seq == nextExpected) {
// this is the one we want!
LOG.debug("WATCH: got expected " + nextExpected);
pop();
total--;
nextExpected += 1;
return im.msg;
} else {
return null;
}
}
public synchronized Message pop(long maxTimeout)
throws InterruptedException {
Timer t = new Timer();
t.start();
IncomingMessage im;
if(maxTimeout == -1) {
maxTimeout = Long.MAX_VALUE;
}// just a big number to get started
long timeLeft = maxTimeout;
while (timeLeft > 0) {
timeLeft = maxTimeout - t.elapsed();
if (timeLeft < 0 ) {
break;
}
im = (IncomingMessage)top();
if ( (im == null) || (im.seq > nextExpected) ) {
// the one we want hasn't arrived yet
LOG.debug("WATCH: waiting for " + nextExpected);
wait(timeLeft); // hope we get awakened with the right one
} else if (im.seq == nextExpected) {
// this is the one we want!
LOG.debug("WATCH: got expected " + nextExpected);
pop();
total--;
nextExpected += 1;
return im.msg;
} else {
// seq is less than expected: it must be a dup, so dump it
LOG.debug("WATCH: less than expected, nextExpected = " +
nextExpected + " seq = " + im.seq);
pop();
total--;
}
}
// if we get here, we went all the way through the loop without
// ever returning the right one :-(
LOG.debug("WATCH: returning null");
return null;
}
public int getFreeSlots () {
return capacity - total;
}
class IncomingMessage {
Message msg;
long seq;
IncomingMessage (Message msg, long seq) {
this.msg = msg;
this.seq = seq;
}
}
}
class OutgoingMessageQueue {
private final static Category LOG =
Category.getInstance(OutgoingMessageQueue.class);
Message[] arr;
int head;
int tail;
int total = 0;
OutgoingMessageQueue (int size) {
arr = new Message[size];
}
synchronized void enqueue (Message msg) throws InterruptedException {
while (isFull ()) {
wait ();
}
arr[tail] = msg;
tail = (tail+1) % arr.length;
total++;
LOG.debug("WATCH: Total messages on queue = " + total);
notifyAll ();
}
synchronized void slide (long n) {
for (int i = 0; i < n; i++) {
arr[head] = null;
total--;
head = (head + 1) % arr.length;
}
}
synchronized Message get (int i, long maxTimeout)
throws InterruptedException {
Timer t = new Timer();
while (total < i) {
t.start();
wait (maxTimeout);
if (t.elapsed() >= maxTimeout) {
return null;
}
}
return arr[(head+i) % arr.length];
}
synchronized Message dequeue (long maxTimeout)
throws InterruptedException {
Message m;
Timer t = new Timer();
while (isEmpty ()) {
t.start();
wait (maxTimeout);
if (t.elapsed() >= maxTimeout) {
return null;
}
}
m = arr[head];
arr[head] = null;
head = (head+1) % arr.length;
total--;
LOG.debug("WATCH Total messages in outgoing queue is " + total);
return m;
}
boolean isEmpty () {
return total == 0;
}
boolean isFull () {
return total == arr.length;
}
int getMessageCount () {
return total;
}
int getFreeSlots () {
return arr.length - total;
}
}
class Timer {
private final static Category LOG =
Category.getInstance(Timer.class);
long start;
void start () {
start = System.currentTimeMillis();
// LOG.debug("WATCH: System time at start = " + start);
}
long elapsed () {
long result = 0;
long currTime = System.currentTimeMillis();
// LOG.debug("WATCH: System time at elapsed = " + currTime);
result = currTime - start;
return result;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -