📄 reliablepipeservice.java
字号:
/*
* Copyright (c) 2001 Sun Microsystems, Inc. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
* must not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see
* <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*
* $Id: ReliablePipeService.java,v 1.2 2002/03/04 21:43:01 echtcherbina Exp $
*/
package net.jxta.impl.util;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.OutputPipe;
import net.jxta.endpoint.Message;
import net.jxta.protocol.PipeAdvertisement;
import java.io.*;
import java.util.*;
/**
* This class provides reliable message-delivery pipes.
* Each message is guaranteed to be delivered and in-order.
*/
public class ReliablePipeService {
BidirectionalPipeService bidirPipeService;
SchedulerService schedulerService;
static final String RELIABLE_HEADER = "RELIABLE_HEADER";
static final int SYN = 1;
static final int SYN_ACK = 2;
static final int MSG = 3;
static final int ACK = 4;
static final int FIN1 = 5;
static final int FIN2 = 6;
static final int ESTABLISHING = 1;
static final int CONNECTED = 2;
static final int TIMED_OUT = 3;
static final int CLOSE_WAIT = 4;
static final int CLOSED = 5;
static final int MAX_RETRANSMITS = 5;
static final int MSG_TIMEOUT = 30 * 1000;
// retransmit in this many milliseconds if no ack was received
static final int RETRANSMIT_TIMEOUT = 1000;
// transmit buffered messages if this timeout has expired,
// even if we don't have enough messages to meet the window size
static final int WINDOW_FLUSH_TIMEOUT = 500;
/**
* Create a ReliablePipeService instance.
*
* @param BidirectionalPipeService The bi-directional pipe service
* to use.
*/
public ReliablePipeService (BidirectionalPipeService bidirPipeService) {
this.bidirPipeService = bidirPipeService;
this.schedulerService = new SchedulerService ();
new Thread (schedulerService).start ();
}
/**
* Connect to the peer offering the given PipeAdvertisement.
*
* @exception IOException if the connection could not be opened.
*/
public Pipe connect (PipeAdvertisement adv, int maxTimeout)
throws IOException
{
long start = System.currentTimeMillis ();
BidirectionalPipeService.Pipe bidirPipe =
bidirPipeService.connect (adv, maxTimeout);
// re-calculate timeout
maxTimeout -= (System.currentTimeMillis () - maxTimeout);
if (maxTimeout > 0)
return new Pipe (bidirPipe, maxTimeout);
else
throw new IOException ("Connect timed out.");
}
/**
* Advertise a pipe of the given name and get ready to
* accept incoming connections.
*
* @param pipeName the name of the pipe to advertise
*/
public AcceptPipe bind (String pipeName) throws IOException {
return new AcceptPipe (bidirPipeService.bind (pipeName));
}
/**
* This is a server-like pipe that can accept incoming
* reliable bi-directional connections.
*/
public class AcceptPipe {
BidirectionalPipeService.AcceptPipe acceptPipe;
AcceptPipe (BidirectionalPipeService.AcceptPipe acceptPipe) {
this.acceptPipe = acceptPipe;
}
/**
* Get the advertisement under which this pipe known.
*/
public PipeAdvertisement getAdvertisement () {
return acceptPipe.getAdvertisement ();
}
/**
* Accept incoming reliable bi-directional connections.
*/
// right now we don't do any additional connection set-up,
// but we probably should. it'll be done here.
public Pipe accept (int maxTimeout)
throws IOException, InterruptedException
{
long start = System.currentTimeMillis ();
BidirectionalPipeService.Pipe bidirPipe =
acceptPipe.accept (maxTimeout);
// re-calculate timeout
maxTimeout -= (System.currentTimeMillis () - maxTimeout);
if (maxTimeout > 0)
return new Pipe (bidirPipe, maxTimeout);
else
throw new IOException ("Connect timed out.");
}
}
/**
* Instances of this class are returned by ReliablePipeService.connect
* and ReliablePipeService.AcceptPipe.accept.
*/
public class Pipe implements InputPipe, OutputPipe {
IncomingMessageQueue incomingQueue;
IncomingMessageQueue incomingQueue2;
OutgoingMessageQueue outgoingQueue;
//PDA requirements 19.02.2002
//java.util.HashMap -> java.util.Hashtable
//HashMap pendingRetransmits;
Hashtable pendingRetransmits;
//PDA requirements 19.02.2002
long outgoingSeq; // sequence counter for outgoing sequences
long highestInSeq; // highest in-sequence message received
long lastAckedSeq; // the last acknowledged message
InputPipe inputPipe;
OutputPipe outputPipe;
Receiver receiver;
Sender sender;
int outgoingWindow;
int state;
boolean running;
Pipe (BidirectionalPipeService.Pipe bidirPipe, long maxTimeout)
throws IOException
{
inputPipe = bidirPipe.getInputPipe ();
outputPipe = bidirPipe.getOutputPipe ();
incomingQueue = new IncomingMessageQueue (512);
incomingQueue2 = new IncomingMessageQueue (512);
outgoingQueue = new OutgoingMessageQueue (512);
//PDA requirements 19.02.2002
//java.util.HashMap -> java.util.Hashtable
//pendingRetransmits = new HashMap ();
pendingRetransmits = new Hashtable();
//PDA requirements 19.02.2002
state = ESTABLISHING;
establish (maxTimeout);
new Thread (receiver = new Receiver ()).start ();
}
/**
* Get the InputPipe on which messages are guaranteed
* to be delivered in-order.
*/
public InputPipe getInputPipe () {
return this;
}
/**
* Get the OutputPipe which guarnatees that all messages
* sent through it will be delivered in-order.
*/
public OutputPipe getOutputPipe () {
return this;
}
void establish (long maxTimeout) throws IOException {
sendMsg (SYN, 0, 0);
try {
while (state == ESTABLISHING)
wait (maxTimeout);
} catch (InterruptedException e) {
throw new IOException ("Interrupted while establishing " +
"connection.");
}
if (state == CONNECTED)
// start the sender
new Thread (sender = new Sender ()).start ();
else
throw new IOException ("Connect timed out.");
}
/**
* Initiate connection tear-down protocol. All messages
* already buffered by send() are guaranteed to be delivered.
*/
public void close () {
if (state != CLOSE_WAIT) {
// tell sender to flush all remaining messages
state = CLOSE_WAIT;
// tell the other side to flush everything to us
try {
sendMsg (FIN1, -1, 0);
} catch (IOException e) {
}
// remaining messages will be sent asynchronously
}
}
public synchronized void send (Message msg)
throws IOException
{
switch (state) {
case ESTABLISHING:
throw new Error ("Should never be here. Theis state is " +
"only while the contructor is running.");
case CONNECTED:
try {
outgoingQueue.enqueue (msg);
} catch (InterruptedException e) {
throw new IOException ("Thread was interrupted while in " +
"send(): " + e.getMessage ());
}
break;
case TIMED_OUT:
throw new IOException ("The peer at the other end has " +
"dropped the connection.");
case CLOSE_WAIT:
throw new IOException ("Shutdown in progress.");
case CLOSED:
throw new IOException ("Connection has closed.");
}
}
class Receiver implements Runnable {
public void run () {
Message msg;
while (state != CLOSED) {
try {
msg = inputPipe.poll (MSG_TIMEOUT);
if (! running)
break;
else if (msg != null)
process (msg);
} catch (InterruptedException e) {
break;
} catch (IOException e) {
e.printStackTrace ();
}
}
}
}
class Sender implements Runnable {
public void run () {
Timer t = new Timer ();
Message m;
while (state != CLOSE_WAIT || ! outgoingQueue.isEmpty ()) {
t.start ();
for (int i = 0; i < outgoingWindow; i++) {
try {
m = outgoingQueue.get (i,
WINDOW_FLUSH_TIMEOUT -
t.elapsed ());
if (m == null)
break;
else
try {
sendMsg (m, outgoingSeq++);
} catch (IOException e) {
// is taken care of automatically
// by retransmits
}
} catch (InterruptedException e) {
break;
}
}
}
if (state == CLOSE_WAIT)
try {
sendMsg (FIN2, outgoingSeq++, 0);
} catch (IOException e) {
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -