⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 reliablepipeservice.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Copyright (c) 2001 Sun Microsystems, Inc. All rights
 * reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 *  modification, are permitted provided that the following conditions
 * are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 * notice, this list of conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright
 * notice, this list of conditions and the following disclaimer in
 * the documentation and/or other materials provided with the
 * distribution.
 *
 * 3. The end-user documentation included with the redistribution,
 * if any, must include the following acknowledgment:
 * "This product includes software developed by the
 * Sun Microsystems, Inc. for Project JXTA."
 * Alternately, this acknowledgment may appear in the software itself,
 * if and wherever such third-party acknowledgments normally appear.
 *
 * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
 * must not be used to endorse or promote products derived from this
 * software without prior written permission. For written
 * permission, please contact Project JXTA at http://www.jxta.org.
 *
 * 5. Products derived from this software may not be called "JXTA",
 * nor may "JXTA" appear in their name, without prior written
 * permission of Sun.
 *
 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 *
 *====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of Project JXTA. For more
 * information on Project JXTA, please see
 * <http://www.jxta.org/>.
 *
 * This license is based on the BSD license adopted by the Apache Foundation.
 *
 * $Id: ReliablePipeService.java,v 1.2 2002/03/04 21:43:01 echtcherbina Exp $
 */
package net.jxta.impl.util;

import net.jxta.pipe.InputPipe;
import net.jxta.pipe.OutputPipe;
import net.jxta.endpoint.Message;
import net.jxta.protocol.PipeAdvertisement;

import java.io.*;
import java.util.*;

/**
 * This class provides reliable message-delivery pipes.
 * Each message is guaranteed to be delivered and in-order.
 */

public class ReliablePipeService {

    BidirectionalPipeService bidirPipeService;
    SchedulerService         schedulerService;

    static final String RELIABLE_HEADER = "RELIABLE_HEADER";

    static final int    SYN = 1;
    static final int    SYN_ACK = 2;
    static final int    MSG = 3;
    static final int    ACK = 4;
    static final int    FIN1 = 5;
    static final int    FIN2 = 6;

    static final int    ESTABLISHING = 1;
    static final int    CONNECTED    = 2;
    static final int    TIMED_OUT    = 3;
    static final int    CLOSE_WAIT   = 4;
    static final int    CLOSED       = 5;

    static final int    MAX_RETRANSMITS = 5;
    static final int    MSG_TIMEOUT = 30 * 1000;
    // retransmit in this many milliseconds if no ack was received
    static final int    RETRANSMIT_TIMEOUT = 1000;
    // transmit buffered messages if this timeout has expired,
    // even if we don't have enough messages to meet the window size
    static final int    WINDOW_FLUSH_TIMEOUT = 500;

    /**
     * Create a ReliablePipeService instance.
     *
     * @param BidirectionalPipeService The bi-directional pipe service
     * to use.
     */
    public ReliablePipeService (BidirectionalPipeService bidirPipeService) {
	this.bidirPipeService = bidirPipeService;
	this.schedulerService = new SchedulerService ();

	new Thread (schedulerService).start ();
    }

    /**
     * Connect to the peer offering the given PipeAdvertisement.
     *
     * @exception IOException if the connection could not be opened.
     */
    public Pipe connect (PipeAdvertisement adv, int maxTimeout)
	throws IOException
    {
	long start = System.currentTimeMillis ();
	BidirectionalPipeService.Pipe bidirPipe =
	    bidirPipeService.connect (adv, maxTimeout);

	// re-calculate timeout
	maxTimeout -= (System.currentTimeMillis () - maxTimeout);

	if (maxTimeout > 0)
	    return new Pipe (bidirPipe, maxTimeout);
	else
	    throw new IOException ("Connect timed out.");
    }

    /**
     * Advertise a pipe of the given name and get ready to
     * accept incoming connections.
     *
     * @param pipeName the name of the pipe to advertise
     */
    public AcceptPipe bind (String pipeName) throws IOException {
	return new AcceptPipe (bidirPipeService.bind (pipeName));
    }

    /**
     * This is a server-like pipe that can accept incoming
     * reliable bi-directional connections.
     */
    public class AcceptPipe {

	BidirectionalPipeService.AcceptPipe acceptPipe;

	AcceptPipe (BidirectionalPipeService.AcceptPipe acceptPipe) {
	    this.acceptPipe = acceptPipe;
	}

	/**
	 * Get the advertisement under which this pipe known.
	 */
	public PipeAdvertisement getAdvertisement () {
	    return acceptPipe.getAdvertisement ();
	}

	/**
	 * Accept incoming reliable bi-directional connections.
	 */
	// right now we don't do any additional connection set-up,
	// but we probably should.  it'll be done here.
	public Pipe accept (int maxTimeout)
	    throws IOException, InterruptedException
	{
	    long start = System.currentTimeMillis ();
	    BidirectionalPipeService.Pipe bidirPipe =
		acceptPipe.accept (maxTimeout);

	    // re-calculate timeout
	    maxTimeout -= (System.currentTimeMillis () - maxTimeout);

	    if (maxTimeout > 0)
		return new Pipe (bidirPipe, maxTimeout);
	    else
		throw new IOException ("Connect timed out.");

	}
    }

    /**
     * Instances of this class are returned by ReliablePipeService.connect
     * and ReliablePipeService.AcceptPipe.accept.
     */
    public class Pipe implements InputPipe, OutputPipe {
	IncomingMessageQueue incomingQueue;
	IncomingMessageQueue incomingQueue2;
	OutgoingMessageQueue outgoingQueue;
	//PDA requirements 19.02.2002
        //java.util.HashMap -> java.util.Hashtable
        //HashMap              pendingRetransmits;
        Hashtable              pendingRetransmits;
	//PDA requirements 19.02.2002

	long outgoingSeq;  // sequence counter for outgoing sequences
	long highestInSeq; // highest in-sequence message received
	long lastAckedSeq; // the last acknowledged message

	InputPipe  inputPipe;
	OutputPipe outputPipe;

	Receiver receiver;
	Sender   sender;
	int      outgoingWindow;

	int      state;
	boolean  running;

	Pipe (BidirectionalPipeService.Pipe bidirPipe, long maxTimeout)
	    throws IOException
	{
	    inputPipe  = bidirPipe.getInputPipe ();
	    outputPipe = bidirPipe.getOutputPipe ();

	    incomingQueue  = new IncomingMessageQueue (512);
	    incomingQueue2 = new IncomingMessageQueue (512);
	    outgoingQueue  = new OutgoingMessageQueue (512);


            //PDA requirements 19.02.2002
            //java.util.HashMap -> java.util.Hashtable
	    //pendingRetransmits = new HashMap ();
            pendingRetransmits = new Hashtable();
            //PDA requirements 19.02.2002

	    state = ESTABLISHING;

	    establish (maxTimeout);

	    new Thread (receiver = new Receiver ()).start ();

	}

	/**
	 * Get the InputPipe on which messages are guaranteed
	 * to be delivered in-order.
	 */
	public InputPipe getInputPipe () {
	    return this;
	}

	/**
	 * Get the OutputPipe which guarnatees that all messages
	 * sent through it will be delivered in-order.
	 */
	public OutputPipe getOutputPipe () {
	    return this;
	}

	void establish (long maxTimeout) throws IOException {

	    sendMsg (SYN, 0, 0);

	    try {
		while (state == ESTABLISHING)
		    wait (maxTimeout);
	    } catch (InterruptedException e) {
		throw new IOException ("Interrupted while establishing " +
				       "connection.");
	    }

	    if (state == CONNECTED)
		// start the sender
		new Thread (sender = new Sender ()).start ();
	    else
		throw new IOException ("Connect timed out.");
	}

	/**
	 * Initiate connection tear-down protocol.  All messages
	 * already buffered by send() are guaranteed to be delivered.
	 */
	public void close () {

	    if (state != CLOSE_WAIT) {
		// tell sender to flush all remaining messages
		state = CLOSE_WAIT;

		// tell the other side to flush everything to us
		try {
		    sendMsg (FIN1, -1, 0);
		} catch (IOException e) {

		}

		// remaining messages will be sent asynchronously
	    }
	}

	public synchronized void send (Message msg)
	    throws IOException
	{
	    switch (state) {
	    case ESTABLISHING:
		throw new Error ("Should never be here.  Theis state is " +
				 "only while the contructor is running.");
	    case CONNECTED:
		try {
		    outgoingQueue.enqueue (msg);
		} catch (InterruptedException e) {
		    throw new IOException ("Thread was interrupted while in " +
					   "send(): " + e.getMessage ());
		}
		break;
	    case TIMED_OUT:
		throw new IOException ("The peer at the other end has " +
				       "dropped the connection.");
	    case CLOSE_WAIT:
		throw new IOException ("Shutdown in progress.");
	    case CLOSED:
		throw new IOException ("Connection has closed.");
	    }
	}


	class Receiver implements Runnable {

	    public void run () {
		Message msg;

		while (state != CLOSED) {
		    try {
			msg = inputPipe.poll (MSG_TIMEOUT);

			if (! running)
			    break;
			else if (msg != null)
			    process (msg);

		    } catch (InterruptedException e) {
			break;
		    } catch (IOException e) {
			e.printStackTrace ();
		    }
		}
	    }
	}



	class Sender implements Runnable {

	    public void run () {

		Timer   t = new Timer ();
		Message m;


		while (state != CLOSE_WAIT || ! outgoingQueue.isEmpty ()) {

		    t.start ();

		    for (int i = 0;  i < outgoingWindow;  i++) {

			try {
			    m = outgoingQueue.get (i,
						   WINDOW_FLUSH_TIMEOUT -
						   t.elapsed ());

			    if (m == null)
				break;
			    else
				try {
				    sendMsg (m, outgoingSeq++);
				} catch (IOException e) {
				    // is taken care of automatically
				    // by retransmits
				}
			} catch (InterruptedException e) {
			    break;
			}

		    }
		}

		if (state == CLOSE_WAIT)
		    try {
			sendMsg (FIN2, outgoingSeq++, 0);
		    } catch (IOException e) {

		    }
	    }
	}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -