⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliablepipeservice.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            sendMsg (ACK, highestInSeq, incomingQueue.getFreeSlots ());
	    }

	    
	    switch (state) {

	    case ESTABLISHING:
		switch (type) {
		    
            case SYN:
                LOG.debug("WATCH: Received SYN, Sending SYN_ACK");
                // tell the other peer what my first sequence number is
                outgoingSeq = new Random ().nextLong ();
                sendMsg (SYN_ACK, outgoingSeq++, 1);
                break;
            case SYN_ACK:
                LOG.debug("WATCH: Received SYN_ACK");
                Object pendingRetransmit =
                            pendingRetransmits.get (new Long (synSeq));

                if (pendingRetransmit != null) {
                    // cancel the pending retransmit
                    LOG.debug("Canceling retransmit for " + synSeq);
                    schedulerService.cancelAction (pendingRetransmit);
                    pendingRetransmits.remove (new Long (synSeq));
                } else {
                    LOG.debug("No retransmit scheduled for " + synSeq);
                }
                
                lastAckedSeq = highestInSeq = seq;
                state = CONNECTED;
                incomingQueue.setNextExpected(seq + 1);
                outgoingWindow = window;
                // a thread is blocked in establish(), so tell it
                // we've opened the connection
                synchronized (lock) {
                    lock.notifyAll ();
                }
                break;
        }
		break;
	    case CONNECTED:
		switch (type) {
		case MSG: 
            LOG.debug("WATCH: putting msg " + seq + " on incoming queue *****");
		    incomingQueue.put (msg, seq);
            notifyListeners();
		    break;
				
		case ACK:
            LOG.debug("WATCH: got ACK " + seq);
		    
		    Object pendingRetransmit = pendingRetransmits.get (new Long (seq));
		    
		    if (pendingRetransmit != null) {
                // cancel the pending retransmit
                LOG.debug("Canceling retransmit for " + seq);
                schedulerService.cancelAction (pendingRetransmit);
                pendingRetransmits.remove (new Long (seq));
                    LOG.debug("WATCH: seq = " + seq +
                              " lastAckedSeq = " + lastAckedSeq);
                    //outgoingQueue.slide (seq - lastAckedSeq);
                    lastAckedSeq = seq;
		    } else {
                LOG.debug("No retransmit scheduled for " + seq);
            }
            outgoingWindow = window;
            // note the window size the peer wants
		    
		    break;

		case FIN1:
		    // peer at other end wants to close
		    // tell sender to send everything we have, then quit
		    
		    state = CLOSE_WAIT;
		    
		    break;
		}

	    case CLOSE_WAIT:
		switch (type) {
		case FIN2:
		    // other peer flushed all his buffers to us
		    state = CLOSED;
		    
		    break;
		}
	    }
	}

	void sendMsg (int type, long seq, int window) throws IOException {
	    Message msg = bidirPipeService.pipeService.createMessage ();
	    
	    setHeader (msg, type, seq, window);
	    outputPipe.send (msg);

	    if (type != ACK &&  type != SYN_ACK) {
            Object pendingRetransmit = 
                schedulerService.scheduleAction (new RetransmitAction(msg),
						     RETRANSMIT_TIMEOUT);
	    
            pendingRetransmits.put (new Long(seq), pendingRetransmit);
	    }
	}
	
	void sendMsg (Message msg, long seq) throws IOException {
	    	    
	    setHeader (msg, MSG, seq, 0);
	    outputPipe.send (msg);
        
        Object pendingRetransmit = 
                schedulerService.scheduleAction	(new RetransmitAction (msg),
						 RETRANSMIT_TIMEOUT);
	    
        pendingRetransmits.put (new Long(seq), pendingRetransmit);
        
	}
	
	
	void setHeader (Message msg, int type, long seq, int window)
	    throws IOException
	{
	    ByteArrayOutputStream out = new ByteArrayOutputStream (12);
	    DataOutputStream      dout = new DataOutputStream (out);
	    
	    dout.writeLong (seq);
	    dout.writeInt (type);
	    dout.writeInt (window);
	    
	    MessageElement msgElem =
                msg.newMessageElement(RELIABLE_HEADER,
                                      null,
                                      out.toByteArray ());
        msg.addElement(msgElem);
        
	}
    
	class RetransmitAction implements SchedulerService.Action {
	    
	    Message msg;
	    int     retransmits;

	    RetransmitAction (Message msg) {
            this.msg  = msg;
	    }
	    
	    public void perform (SchedulerService ss) {
            long seq = 0;
            try {
                DataInputStream header = 
                    new DataInputStream (
                                   msg.getElement(RELIABLE_HEADER).getStream());

                seq    = header.readLong ();
                int     type   = header.readInt ();
                int     window = header.readInt ();

                LOG.debug("Retransmitting " + seq + " type " + type);
            
                outputPipe.send (msg);
            } catch (IOException e) {
                // nothing to do, will quit when retransmit count maxes out
            }
		
            if (retransmits < MAX_RETRANSMITS) {
                Object pendingRetransmit =
                ss.scheduleAction (this, RETRANSMIT_TIMEOUT);
		    
                pendingRetransmits.put (new Long(seq), pendingRetransmit);
                retransmits++;
            } else {
                // perhaps the connection has been dropped by the other
                // peer
                // is the following right, though?  should we wait?
                state = TIMED_OUT;
                synchronized (lock) {
                    lock.notifyAll ();
             }
         }
	    }
	}
    }
}


class IncomingMessageQueue extends PriorityQueue {
    private final static Category LOG =
                               Category.getInstance(IncomingMessageQueue.class);
    int  capacity;
    int  total;
    long nextExpected;
    
    IncomingMessageQueue (int size) {
        initialize (size, 2);
        this.capacity = size;
    }
    
    
    protected boolean lessThan (Object o1, Object o2) {
        return ((IncomingMessage) o1).seq < ((IncomingMessage) o2).seq;
    }

    public synchronized void put (Message msg, long seq) {
        put (new IncomingMessage (msg, seq));
        total++;
        notifyAll ();
    }

    public synchronized void setNextExpected(long nextExpected) {
        this.nextExpected = nextExpected;
    }

    public synchronized long popUntilBreakInSequence (long lastSeq) {
        IncomingMessage msg;

        while (top () != null) {
            msg = (IncomingMessage) top ();
            // okay we got the top()
            if (lastSeq + 1 == msg.seq) {
                // if it's one more than the previous, pop it off.
                lastSeq = msg.seq;
                pop ();
                total--;
            } else {
                break;
            }
        }

        return lastSeq;
    }

    /** 
     * Wait until the next in-sequence message is pushed, 
     * then return it.
     */
/*
    public synchronized Message pop (long maxTimeout)
                                                   throws InterruptedException {
	
    	IncomingMessage im;
        Timer t = new Timer();
        t.start();
        long timeout;
        while (true) {
            while (top () == null) {
                if(maxTimeout == -1) {
                    wait();
                } else {
                    timeout = maxTimeout - t.elapsed();
                    if (timeout > 0) {
                        wait (timeout);
                    } else {
                        //return null; // should break out of outside loop
                        break;
                    }
                }
            }
	
            im = (IncomingMessage) top ();
            if (im == null) {
                return null;
            } else if (im.seq == nextExpected) {
                LOG.debug("im.seq = " + im.seq + " nextExpected = "
                          + nextExpected);
                pop ();
                total--;
                nextExpected = im.seq + 1;
                return im.msg;  
            } else {
                LOG.debug("im.seq = " + im.seq + " nextExpected = "
                          + nextExpected);
                continue;
            }
        }
    }
*/
    
    public synchronized Message popNext() {
        IncomingMessage im;
        im = (IncomingMessage)top();
        if (im == null) {
            return null;
        }
        // get rid of all the ones less than expected.
        // they are duplicates due to retransmissions
        while (im != null && im.seq < nextExpected) {
           LOG.debug("WATCH: less than expected, nextExpected = " +
               nextExpected + " seq = " + im.seq);
           pop();
           total--;
           im = (IncomingMessage)top();
        }
        
        if (im != null && im.seq == nextExpected) {
            // this is the one we want!
            LOG.debug("WATCH: got expected " + nextExpected);
            pop();
            total--;
            nextExpected += 1;
            return im.msg;
        } else {
            return null;
        }
    }
    
    public synchronized Message pop(long maxTimeout)
                                                throws InterruptedException {
        Timer t = new Timer();
        t.start();
        IncomingMessage im; 
        if(maxTimeout == -1) {
            maxTimeout = Long.MAX_VALUE;
        }// just a big number to get started
        long timeLeft = maxTimeout;
        while (timeLeft > 0) {
           timeLeft = maxTimeout - t.elapsed();
           if (timeLeft < 0 ) {
               break;
           }
           im = (IncomingMessage)top();
           if ( (im == null) || (im.seq > nextExpected) ) {
               // the one we want hasn't arrived yet
               LOG.debug("WATCH: waiting for " + nextExpected);
               wait(timeLeft); // hope we get awakened with the right one
           } else if (im.seq == nextExpected) {
               // this is the one we want!
               LOG.debug("WATCH: got expected " + nextExpected);
               pop();
               total--;
               nextExpected += 1;
               return im.msg;
           } else {
               // seq is less than expected: it must be a dup, so dump it
               LOG.debug("WATCH: less than expected, nextExpected = " +
                   nextExpected + " seq = " + im.seq);
               pop();
               total--;
           }
        }
        // if we get here, we went all the way through the loop without
        // ever returning the right one :-(
        LOG.debug("WATCH: returning null"); 
        return null;
    }
    
    public int getFreeSlots () {
        return capacity - total;
    }

    class IncomingMessage {
	Message msg;
	long    seq;
	
	IncomingMessage (Message msg, long seq) {
	    this.msg = msg;
	    this.seq = seq;
	}
    }

}


class OutgoingMessageQueue {
    private final static Category LOG =
                               Category.getInstance(OutgoingMessageQueue.class);
    Message[] arr;
    int       head;
    int       tail;
    int       total = 0;

    OutgoingMessageQueue (int size) {
        arr = new Message[size];
    }

    synchronized void enqueue (Message msg) throws InterruptedException {

        while (isFull ()) {
            wait ();
        }
        arr[tail] = msg;
        tail = (tail+1) % arr.length;
        total++;
        LOG.debug("WATCH: Total messages on queue = " + total);
	
        notifyAll ();
    }
    
    synchronized void slide (long n) {
        for (int i = 0;  i < n;  i++) {
            arr[head] = null;
            total--;
            head = (head + 1) % arr.length;
        }
    }

    synchronized Message get (int i, long maxTimeout) 
                                                   throws InterruptedException {
        Timer t = new Timer();
        while (total < i) {
            t.start();
            wait (maxTimeout);
            if (t.elapsed() >= maxTimeout) {
                return null;
            }
        }
    	return arr[(head+i) % arr.length];
    }

    synchronized Message dequeue (long maxTimeout) 
                                            throws InterruptedException {
        Message m;
        Timer t = new Timer();
        while (isEmpty ()) {
            t.start();
            wait (maxTimeout);
            if (t.elapsed() >= maxTimeout) {
                return null;
            }
        }

        m = arr[head];

        arr[head] = null;
        head = (head+1) % arr.length;
        total--;
        LOG.debug("WATCH Total messages in outgoing queue is " + total);

        return m;
    }

    boolean isEmpty () {
        return total == 0;
    }

    boolean isFull () {
        return total == arr.length;
    }

    int getMessageCount () {
        return total;
    }

    int getFreeSlots () {
        return arr.length - total;
    }
}


class Timer {

    private final static Category LOG =
                                Category.getInstance(Timer.class);
    long start;

    void start () {
        start = System.currentTimeMillis();
        // LOG.debug("WATCH: System time at start = " + start);
    }

    long elapsed () {
        long result = 0;
        long currTime = System.currentTimeMillis();
        // LOG.debug("WATCH: System time at elapsed = " + currTime);
        result = currTime - start;
        return result;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -