📄 reliablepipeservice.java
字号:
public synchronized Message waitForMessage ()
throws InterruptedException
{
return poll (-1);
}
public synchronized Message poll (int msgTimeout)
throws InterruptedException
{
return incomingQueue.pop (msgTimeout);
}
synchronized void process (Message msg) throws IOException {
DataInputStream header =
new DataInputStream (msg.getElement (RELIABLE_HEADER).getStream());
long seq = header.readLong ();
int type = header.readInt ();
int window = header.readInt ();
// ack the message
if (type != SYN && type != SYN_ACK && type != ACK) {
incomingQueue2.put (msg, seq);
highestInSeq =
incomingQueue2.popUntilBreakInSequence (highestInSeq);
sendMsg (ACK, highestInSeq, incomingQueue.getFreeSlots ());
}
switch (state) {
case ESTABLISHING:
switch (type) {
case SYN:
// tell the other peer what my first sequence number is
outgoingSeq = new Random ().nextLong ();
sendMsg (SYN_ACK, outgoingSeq, 0);
break;
case SYN_ACK:
lastAckedSeq = highestInSeq = seq;
state = CONNECTED;
// a thread is blocked in establish(), so tell it
// we've opened the connection
notifyAll ();
break;
}
break;
case CONNECTED:
switch (type) {
case MSG:
incomingQueue.put (msg, seq);
break;
case ACK:
Object pendingRetransmit =
pendingRetransmits.get (new Long (seq));
if (pendingRetransmit != null) {
// cancel the pending retransmit
schedulerService.cancelAction (pendingRetransmit);
pendingRetransmits.remove (new Long (seq));
// slide the window forward
outgoingQueue.slide (seq - lastAckedSeq);
lastAckedSeq = seq;
// note the window size the peer wants
outgoingWindow = window;
}
break;
case FIN1:
// peer at other end wants to close
// tell sender to send everything we have, then quit
state = CLOSE_WAIT;
break;
}
case CLOSE_WAIT:
switch (type) {
case FIN2:
// other peer flushed all his buffers to us
state = CLOSED;
break;
}
}
}
void sendMsg (int type, long seq, int window) throws IOException {
Message msg = bidirPipeService.pipeService.createMessage ();
setHeader (msg, type, seq, window);
outputPipe.send (msg);
if (type != ACK) {
Object pendingRetransmit =
schedulerService.scheduleAction (new RetransmitAction(msg),
RETRANSMIT_TIMEOUT);
pendingRetransmits.put (msg, pendingRetransmit);
}
}
void sendMsg (Message msg, long seq) throws IOException {
setHeader (msg, MSG, seq, 0);
outputPipe.send (msg);
Object pendingRetransmit =
schedulerService.scheduleAction (new RetransmitAction (msg),
RETRANSMIT_TIMEOUT);
pendingRetransmits.put (msg, pendingRetransmit);
}
void setHeader (Message msg, int type, long seq, int window)
throws IOException
{
ByteArrayOutputStream out = new ByteArrayOutputStream (12);
DataOutputStream dout = new DataOutputStream (out);
dout.writeLong (seq);
dout.writeInt (type);
dout.writeInt (window);
msg.newMessageElement (RELIABLE_HEADER,
null,
out.toByteArray ());
}
class RetransmitAction implements SchedulerService.Action {
Message msg;
int retransmits;
RetransmitAction (Message msg) {
this.msg = msg;
}
public void perform (SchedulerService ss) {
try {
outputPipe.send (msg);
} catch (IOException e) {
// nothing to do, will quit when retransmit count maxes out
}
if (retransmits < MAX_RETRANSMITS) {
Object pendingRetransmit =
ss.scheduleAction (this, RETRANSMIT_TIMEOUT);
pendingRetransmits.put (msg, pendingRetransmit);
} else {
// perhaps the connection has been dropped by the other
// peer
// is the following right, though? should we wait?
state = TIMED_OUT;
notifyAll ();
}
}
}
}
}
class IncomingMessageQueue extends PriorityQueue {
int capacity;
int total;
long nextExpected;
IncomingMessageQueue (int size) {
initialize (size, 2);
this.capacity = size;
}
protected boolean lessThan (Object o1, Object o2) {
return ((IncomingMessage) o1).seq < ((IncomingMessage) o2).seq;
}
public synchronized void put (Message msg, long seq) {
put (new IncomingMessage (msg, seq));
total++;
notifyAll ();
}
public synchronized long popUntilBreakInSequence (long lastSeq) {
IncomingMessage msg;
while (top () != null) {
msg = (IncomingMessage) top ();
if (lastSeq + 1 == msg.seq) {
lastSeq = msg.seq;
pop ();
} else {
break;
}
}
return lastSeq;
}
/**
* Wait until the next in-sequence message is pushed,
* then return it.
*/
public synchronized Message pop (long maxTimeout)
throws InterruptedException
{
IncomingMessage im;
long start = System.currentTimeMillis ();
long timeout;
while (true) {
while (top () != null) {
timeout = maxTimeout - (System.currentTimeMillis () - start);
if (timeout > 0)
wait (timeout);
else
break;
}
im = (IncomingMessage) top ();
if (im == null) {
return null;
} else if (im.seq == nextExpected) {
pop ();
total--;
nextExpected = im.seq + 1;
return im.msg;
} else {
continue;
}
}
}
public int getFreeSlots () {
return capacity - total;
}
class IncomingMessage {
Message msg;
long seq;
IncomingMessage (Message msg, long seq) {
this.msg = msg;
this.seq = seq;
}
}
}
class OutgoingMessageQueue {
Message[] arr;
int head;
int tail;
int total;
OutgoingMessageQueue (int size) {
arr = new Message[size];
}
synchronized void enqueue (Message msg) throws InterruptedException {
while (isFull ())
wait ();
arr[tail] = msg;
tail = (tail+1) % arr.length;
total++;
notifyAll ();
}
synchronized void slide (long n) {
for (int i = 0; i < n; i++) {
arr[head] = null;
head = (head + 1) % arr.length;
}
}
synchronized Message get (int i, long maxTimeout)
throws InterruptedException
{
long start;
while (total < i) {
start = System.currentTimeMillis ();
wait (maxTimeout);
if (System.currentTimeMillis () - start >= maxTimeout)
return null;
}
return arr[(head+i) % arr.length];
}
synchronized Message dequeue (Message msg, long maxTimeout)
throws InterruptedException
{
Message m;
long start;
while (isEmpty ()) {
start = System.currentTimeMillis ();
wait (maxTimeout);
if (System.currentTimeMillis () - start >= maxTimeout) {
return null;
}
}
m = arr[head];
arr[head] = null;
head = (head+1) % arr.length;
total--;
return m;
}
boolean isEmpty () {
return total == 0;
}
boolean isFull () {
return total == arr.length;
}
int getMessageCount () {
return total;
}
int getFreeSlots () {
return arr.length - total;
}
}
class Timer {
long start;
void start () {
start = System.currentTimeMillis ();
}
long elapsed () {
return System.currentTimeMillis () - start;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -