⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliablepipeservice.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
	public synchronized Message waitForMessage ()
	    throws InterruptedException
	{
	    return poll (-1);
	}

	public synchronized Message poll (int msgTimeout)
	    throws InterruptedException
	{

	    return incomingQueue.pop (msgTimeout);
	}


	synchronized void process (Message msg) throws IOException {

	    DataInputStream header =
		new DataInputStream (msg.getElement (RELIABLE_HEADER).getStream());

	    long    seq    = header.readLong ();
	    int     type   = header.readInt ();
	    int     window = header.readInt ();


	    // ack the message
	    if (type != SYN  &&  type != SYN_ACK  &&  type != ACK) {
		incomingQueue2.put (msg, seq);

		highestInSeq =
		    incomingQueue2.popUntilBreakInSequence (highestInSeq);

		sendMsg (ACK, highestInSeq, incomingQueue.getFreeSlots ());
	    }


	    switch (state) {

	    case ESTABLISHING:
		switch (type) {

		case SYN:
		    // tell the other peer what my first sequence number is
		    outgoingSeq = new Random ().nextLong ();
		    sendMsg (SYN_ACK, outgoingSeq, 0);
		    break;
		case SYN_ACK:

		    lastAckedSeq = highestInSeq = seq;
		    state = CONNECTED;

		    // a thread is blocked in establish(), so tell it
		    // we've opened the connection
		    notifyAll ();
		    break;
		}
		break;
	    case CONNECTED:
		switch (type) {
		case MSG:

		    incomingQueue.put (msg, seq);

		    break;

		case ACK:

		    Object pendingRetransmit =
			pendingRetransmits.get (new Long (seq));

		    if (pendingRetransmit != null) {
		    // cancel the pending retransmit
			schedulerService.cancelAction (pendingRetransmit);
			pendingRetransmits.remove (new Long (seq));

			// slide the window forward
			outgoingQueue.slide (seq - lastAckedSeq);
			lastAckedSeq = seq;

			// note the window size the peer wants
			outgoingWindow = window;
		    }

		    break;

		case FIN1:
		    // peer at other end wants to close
		    // tell sender to send everything we have, then quit

		    state = CLOSE_WAIT;

		    break;
		}

	    case CLOSE_WAIT:
		switch (type) {
		case FIN2:
		    // other peer flushed all his buffers to us
		    state = CLOSED;

		    break;
		}
	    }




	}


	void sendMsg (int type, long seq, int window) throws IOException {
	    Message msg = bidirPipeService.pipeService.createMessage ();

	    setHeader (msg, type, seq, window);

	    outputPipe.send (msg);

	    if (type != ACK) {
		Object pendingRetransmit =
		    schedulerService.scheduleAction (new RetransmitAction(msg),
						     RETRANSMIT_TIMEOUT);

		pendingRetransmits.put (msg, pendingRetransmit);
	    }
	}

	void sendMsg (Message msg, long seq) throws IOException {

	    setHeader (msg, MSG, seq, 0);

	    outputPipe.send (msg);

	    Object pendingRetransmit =
		schedulerService.scheduleAction	(new RetransmitAction (msg),
						 RETRANSMIT_TIMEOUT);

	    pendingRetransmits.put (msg, pendingRetransmit);
	}


	void setHeader (Message msg, int type, long seq, int window)
	    throws IOException
	{
	    ByteArrayOutputStream out = new ByteArrayOutputStream (12);
	    DataOutputStream      dout = new DataOutputStream (out);

	    dout.writeLong (seq);
	    dout.writeInt (type);
	    dout.writeInt (window);

	    msg.newMessageElement (RELIABLE_HEADER,
				   null,
				   out.toByteArray ());
	}

	class RetransmitAction implements SchedulerService.Action {

	    Message msg;
	    int     retransmits;

	    RetransmitAction (Message msg) {
		this.msg  = msg;
	    }

	    public void perform (SchedulerService ss) {
		try {
		    outputPipe.send (msg);
		} catch (IOException e) {
		    // nothing to do, will quit when retransmit count maxes out
		}

		if (retransmits < MAX_RETRANSMITS) {
		    Object pendingRetransmit =
			ss.scheduleAction (this, RETRANSMIT_TIMEOUT);

		    pendingRetransmits.put (msg, pendingRetransmit);
		} else {
		    // perhaps the connection has been dropped by the other
		    // peer
		    // is the following right, though?  should we wait?
		    state = TIMED_OUT;
		    notifyAll ();
		}
	    }
	}
    }
}




class IncomingMessageQueue extends PriorityQueue {
    int  capacity;
    int  total;
    long nextExpected;

    IncomingMessageQueue (int size) {
	initialize (size, 2);

	this.capacity = size;
    }


    protected boolean lessThan (Object o1, Object o2) {
	return ((IncomingMessage) o1).seq < ((IncomingMessage) o2).seq;
    }

    public synchronized void put (Message msg, long seq) {
	put (new IncomingMessage (msg, seq));
	total++;
	notifyAll ();
    }


    public synchronized long popUntilBreakInSequence (long lastSeq) {
	IncomingMessage msg;

	while (top () != null) {
	    msg = (IncomingMessage) top ();

	    if (lastSeq + 1 == msg.seq) {
		lastSeq = msg.seq;
		pop ();
	    } else {
		break;
	    }
	}

	return lastSeq;
    }

    /**
     * Wait until the next in-sequence message is pushed,
     * then return it.
     */

    public synchronized Message pop (long maxTimeout)
	throws InterruptedException
    {

	IncomingMessage im;
	long start = System.currentTimeMillis ();
	long timeout;

	while (true) {

	    while (top () != null) {
		timeout = maxTimeout - (System.currentTimeMillis () - start);
		if (timeout > 0)
		    wait (timeout);
		else
		    break;
	    }

	    im = (IncomingMessage) top ();

	    if (im == null) {
		return null;
	    } else if (im.seq == nextExpected) {
		pop ();
		total--;
		nextExpected = im.seq + 1;
		return im.msg;
	    } else {
		continue;
	    }
	}

    }

    public int getFreeSlots () {
	return capacity - total;
    }

    class IncomingMessage {
	Message msg;
	long    seq;

	IncomingMessage (Message msg, long seq) {
	    this.msg = msg;
	    this.seq = seq;
	}
    }

}


class OutgoingMessageQueue {
    Message[] arr;
    int       head;
    int       tail;
    int       total;

    OutgoingMessageQueue (int size) {
	arr = new Message[size];
    }

    synchronized void enqueue (Message msg) throws InterruptedException {

	while (isFull ())
	    wait ();

	arr[tail] = msg;
	tail = (tail+1) % arr.length;
	total++;

	notifyAll ();
    }

    synchronized void slide (long n) {
	for (int i = 0;  i < n;  i++) {
	    arr[head] = null;
	    head = (head + 1) % arr.length;
	}
    }

    synchronized Message get (int i, long maxTimeout)
	throws InterruptedException
    {
	long start;

	while (total < i) {
	    start = System.currentTimeMillis ();

	    wait (maxTimeout);

	    if (System.currentTimeMillis () - start >= maxTimeout)
		return null;
	}

	return arr[(head+i) % arr.length];
    }

    synchronized Message dequeue (Message msg, long maxTimeout)
	throws InterruptedException
    {
	Message m;
	long    start;

	while (isEmpty ()) {
	    start = System.currentTimeMillis ();

	    wait (maxTimeout);

	    if (System.currentTimeMillis () - start >= maxTimeout) {

		return null;
	    }
	}

	m = arr[head];

	arr[head] = null;
	head = (head+1) % arr.length;
	total--;

	return m;
    }

    boolean isEmpty () {
	return total == 0;
    }

    boolean isFull () {
	return total == arr.length;
    }

    int getMessageCount () {
	return total;
    }

    int getFreeSlots () {
	return arr.length - total;
    }
}


	class Timer {

	    long start;

	    void start () {
		start = System.currentTimeMillis ();
	    }

	    long elapsed () {
		return System.currentTimeMillis () - start;
	    }
	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -