⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliablepipeservice.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Copyright (c) 2001 Sun Microsystems, Inc. All rights
 * reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 *  modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 * notice, this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 * notice, this list of conditions and the following disclaimer in
 * the documentation and/or other materials provided with the
 * distribution.
 *
 * 3. The end-user documentation included with the redistribution,
 * if any, must include the following acknowledgment:
 * "This product includes software developed by the
 * Sun Microsystems, Inc. for Project JXTA."
 * Alternately, this acknowledgment may appear in the software itself,
 * if and wherever such third-party acknowledgments normally appear.
 *
 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
 * must not be used to endorse or promote products derived from this
 * software without prior written permission. For written
 * permission, please contact Project JXTA at http://www.jxta.org.
 *
 * 5. Products derived from this software may not be called "JXTA",
 * nor may "JXTA" appear in their name, without prior written
 * permission of Sun.
 *
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 *
 *====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of Project JXTA. For more
 * information on Project JXTA, please see
 * <http://www.jxta.org/>.
 *
 * This license is based on the BSD license adopted by the Apache Foundation.
 *
 * $Id: ReliablePipeService.java,v 1.8 2002/06/11 17:02:12 thomas Exp $
 */
package net.jxta.impl.util;

import net.jxta.pipe.*;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.protocol.PipeAdvertisement;

import java.io.*;
import java.util.*;
import org.apache.log4j.Category;

/**
 * This class provides reliable message-delivery pipes.
 * Each message is guaranteed to be delivered and in-order.
 */

public class ReliablePipeService {

    private final static Category LOG =
                                Category.getInstance(ReliablePipeService.class);

    BidirectionalPipeService bidirPipeService;
    SchedulerService         schedulerService;
    static final String RELIABLE_HEADER = "RELIABLE_HEADER";
    
    static final int    SYN = 1;
    static final int    SYN_ACK = 2;
    static final int    MSG = 3;
    static final int    ACK = 4;
    static final int    FIN1 = 5;
    static final int    FIN2 = 6;

    static final int    ESTABLISHING = 1;
    static final int    CONNECTED    = 2;
    static final int    TIMED_OUT    = 3;
    static final int    CLOSE_WAIT   = 4;
    static final int    CLOSED       = 5;

    static final int    MAX_RETRANSMITS = 5;
    static final int    MSG_TIMEOUT = 30 * 1000;
    // retransmit in this many milliseconds if no ack was received
    static final int    RETRANSMIT_TIMEOUT = 1000;
    // transmit buffered messages if this timeout has expired,
    // even if we don't have enough messages to meet the window size
    static final int    WINDOW_FLUSH_TIMEOUT = 500;
        
    /**
     * Create a ReliablePipeService instance.
     * 
     * @param BidirectionalPipeService The bi-directional pipe service
     * to use.
     */
    public ReliablePipeService (BidirectionalPipeService bidirPipeService) {
	this.bidirPipeService = bidirPipeService;
	this.schedulerService = new SchedulerService ();

	new Thread (schedulerService).start ();
    }

    /**
     * Connect to the peer offering the given PipeAdvertisement.
     *
     * @exception IOException if the connection could not be opened. 
     */
    
    public Pipe connect (PipeAdvertisement adv, int maxTimeout)
                                                            throws IOException {
        LOG.debug("WATCH: REALLY Connecting...");
        Timer t = new Timer();
        t.start();
        BidirectionalPipeService.Pipe bidirPipe = 
                                    bidirPipeService.connect (adv, maxTimeout);
        LOG.debug("WATCH: got bidirPipe");
        // re-calculate timeout
        maxTimeout -= t.elapsed();
        if (maxTimeout > 0) {
        LOG.debug("WATCH: creating new Pipe");
            return new Pipe (bidirPipe, maxTimeout);
        } else {
            throw new IOException ("Connect timed out.");
        }
    }
    
    /**
     * Advertise a pipe of the given name and get ready to
     * accept incoming connections.
     *
     * @param pipeName the name of the pipe to advertise
     */
    public AcceptPipe bind (String pipeName) throws IOException {
        return new AcceptPipe (bidirPipeService.bind (pipeName));
    }
    
    public AcceptPipe bind(PipeAdvertisement inputPipeAdv) throws IOException {
        return new AcceptPipe(bidirPipeService.bind(inputPipeAdv));
    }
    
    /**
     * This is a server-like pipe that can accept incoming
     * reliable bi-directional connections.
     */
    public class AcceptPipe {

        BidirectionalPipeService.AcceptPipe acceptPipe;

        AcceptPipe (BidirectionalPipeService.AcceptPipe acceptPipe) {
            this.acceptPipe = acceptPipe;
        }

        /**
         * Get the advertisement under which this pipe known.
         */
        public PipeAdvertisement getAdvertisement () {
            return acceptPipe.getAdvertisement ();
        }

        /**
         * Accept incoming reliable bi-directional connections.
         */
        // right now we don't do any additional connection set-up,
        // but we probably should.  it'll be done here.
        public Pipe accept (int maxTimeout) 
                                      throws IOException, InterruptedException {
            LOG.debug("WATCH: REALLY waiting for connections....");

            Timer t = new Timer();
            t.start();
            BidirectionalPipeService.Pipe bidirPipe = 
                                                 acceptPipe.accept (maxTimeout);

            LOG.debug("WATCH: got bidirPipe..." + bidirPipe);
            // re-calculate timeout
            maxTimeout -= t.elapsed();
            LOG.debug("WATCH: maxTimeout = " + maxTimeout);
            if (maxTimeout > 0) {
                LOG.debug("WATCH: creating new Pipe...");
                return new Pipe (bidirPipe, maxTimeout);
            } else {
                throw new IOException ("Connect timed out.");
            }
        }
    }
    /**
     * Instances of this class are returned by ReliablePipeService.connect
     * and ReliablePipeService.AcceptPipe.accept.
     */
    public class Pipe implements InputPipe, OutputPipe {
	IncomingMessageQueue incomingQueue;
	IncomingMessageQueue incomingQueue2;
	OutgoingMessageQueue outgoingQueue;
	HashMap              pendingRetransmits;

	long outgoingSeq;  // sequence counter for outgoing sequences
	long highestInSeq; // highest in-sequence message received
	long lastAckedSeq; // the last acknowledged message
    long synSeq = -1; // the sequence number sent with SYN
	
	InputPipe  inputPipe;
	OutputPipe outputPipe;

	Receiver receiver;
	Sender   sender;
	int      outgoingWindow;
	
	int      state;
	boolean  running = false;
	Object lock = new Object();
    Vector listeners = new Vector();

	Pipe (BidirectionalPipeService.Pipe bidirPipe, long maxTimeout) 
                                                            throws IOException {
        LOG.debug("WATCH: Pipe constructor");
	    inputPipe  = bidirPipe.getInputPipe ();
	    outputPipe = bidirPipe.getOutputPipe ();

	    incomingQueue  = new IncomingMessageQueue (512);
	    incomingQueue2 = new IncomingMessageQueue (512);
	    outgoingQueue  = new OutgoingMessageQueue (512);
	    pendingRetransmits = new HashMap ();
	    state = ESTABLISHING;
        LOG.debug("WATCH: starting receiver");
        running = true;
	    new Thread (receiver = new Receiver ()).start ();
	    establish (maxTimeout);
	}

	/**
	 * Get the InputPipe on which messages are guaranteed
	 * to be delivered in-order.
	 */
	public InputPipe getInputPipe () {
	    return this;
	}
	
	/**
	 * Get the OutputPipe which guarnatees that all messages
	 * sent through it will be delivered in-order.
	 */
	public OutputPipe getOutputPipe () {
	    return this;
	}

	void establish (long maxTimeout) throws IOException {
        
        LOG.debug("WATCH: sending SYN...");
	    sendMsg (SYN, synSeq, 0);

	    try {
		while (state == ESTABLISHING)
            synchronized (lock) {
                lock.wait (maxTimeout);
            }
	    } catch (InterruptedException e) {
		throw new IOException ("Interrupted while establishing " +
				       "connection.");
	    }

	    if (state == CONNECTED) { 
		// start the sender
            LOG.debug("WATCH: starting sender...");
            new Thread (sender = new Sender ()).start ();
        } else {
            throw new IOException ("Connect timed out.");
        }
	}

	/**
	 * Initiate connection tear-down protocol.  All messages
	 * already buffered by send() are guaranteed to be delivered.
	 */
	public void close () {
	    
	    if (state != CLOSE_WAIT) {
		// tell sender to flush all remaining messages
		state = CLOSE_WAIT;
		
		// tell the other side to flush everything to us
		try {
		    sendMsg (FIN1, -1, 0);
		} catch (IOException e) {
		    
		}

		// remaining messages will be sent asynchronously
	    }
	}

	public synchronized void send (Message msg) 
	    throws IOException
	{
	    switch (state) {
            case ESTABLISHING:
                throw new Error ("Should never be here.  Theis state is " +
                         "only while the contructor is running.");
            case CONNECTED:
                LOG.debug("WATCH: enqueuing message...");
                try {
                    outgoingQueue.enqueue (msg);
                } catch (InterruptedException e) {
                    throw new IOException ("Thread was interrupted while in " +
                               "send(): " + e.getMessage ());
                }
                break;
            case TIMED_OUT:
                throw new IOException ("The peer at the other end has " +
                               "dropped the connection.");
            case CLOSE_WAIT:
                throw new IOException ("Shutdown in progress.");
            case CLOSED:
                throw new IOException ("Connection has closed.");
	    }
	}

	
	class Receiver implements Runnable {
	    
	    public void run () {
		Message msg;
		
		while (state != CLOSED) {
		    try {
                msg = inputPipe.poll (MSG_TIMEOUT);
                LOG.debug("WATCH: got message " + msg);
                if (!running) {
                    break;
                } else if (msg != null) {
                    LOG.debug("WATCH: message not null, processing");
                    process (msg);                }
		    } catch (InterruptedException e) {
                break;
		    } catch (IOException e) {
                e.printStackTrace ();
		    }
		}
	    }
	}
    

	
	class Sender implements Runnable {

	    public void run () {
		
		Message m;

		LOG.debug("WATCH: sending messages on queue");
        //synchronized (outgoingQueue) {
            while (state != CLOSE_WAIT  /* && !outgoingQueue.isEmpty () */  ) {
                Timer   t = new Timer ();
                t.start ();
                LOG.debug("WATCH: outgoingWindow = " + outgoingWindow);
                int i;
                for (i = 0;  i < outgoingWindow;  i++) {
                    try {
                        LOG.debug("WATCH: getting message off the queue, i = " + i);
                        LOG.debug("WATCH: elapsed time = " + t.elapsed());
                        long timeLeft = WINDOW_FLUSH_TIMEOUT - t.elapsed();
                        
                        if(timeLeft > 0 ) {
                            m = outgoingQueue.dequeue(timeLeft);
                        } else if(!outgoingQueue.isEmpty()) {
                            m = outgoingQueue.dequeue(0);
                        } else {
                            break;
                        }
                        
                        /*
                         m = outgoingQueue.get (i,timeLeft);
                         */
                        if (m == null) {
                            LOG.debug("WATCH: No message :-(");
                            break;
                        } else {
                            try {
                                LOG.debug("WATCH: sending message " + outgoingSeq + " ping = " + m.getString("count"));
                                sendMsg (m, outgoingSeq++);
                            } catch (IOException e) {
                                // is taken care of automatically
                                // by retransmits
                            }
                        }
                    } catch (InterruptedException e) {
                        break;
                    }
                }
           // }
        }
		
		if (state == CLOSE_WAIT)
		    try {
			sendMsg (FIN2, outgoingSeq++, 0);
		    } catch (IOException e) {

		    }
	    }
	}
	

	public synchronized Message waitForMessage () throws InterruptedException 
	{
	    return poll (-1);
	}

	public synchronized Message poll (int msgTimeout) 
                                                throws InterruptedException {
	    
	    return incomingQueue.pop (msgTimeout);
	}

	
    public synchronized void addPipeMsgListener(PipeMsgListener listener) {
        if(!listeners.contains(listener)) {
            listeners.add(listener);
        }
    }
    
    public synchronized void removePipeMsgListener(PipeMsgListener listener) {
        listeners.remove(listener);
    }
    
    
    void notifyListeners() {
        Message msg = incomingQueue.popNext();
        while (msg != null) {
            // notify all listeners
            PipeMsgEvent pipeMsgEvent =
                           new PipeMsgEvent(this,msg,null);
            Iterator iter = listeners.iterator();
            while(iter.hasNext()) {
                PipeMsgListener listener = (PipeMsgListener)iter.next();
                listener.pipeMsgEvent(pipeMsgEvent);
            }
            msg = incomingQueue.popNext();
        }
    }
    
	void process (Message msg) throws IOException {
	    
        DataInputStream header = 
            new DataInputStream (msg.getElement(RELIABLE_HEADER).getStream());
	    
	    long    seq    = header.readLong ();
	    int     type   = header.readInt ();
	    int     window = header.readInt ();


	    // ack the message
	    if (type != SYN  &&  type != SYN_ACK  &&  type != ACK) {
            incomingQueue2.put (msg, seq);
		
            highestInSeq = 
                incomingQueue2.popUntilBreakInSequence (highestInSeq);
            LOG.debug("WATCH: Sending ACK " + highestInSeq);
            LOG.debug("WATCH: Free Slots = " + incomingQueue.getFreeSlots ());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -