📄 reliablepipeservice.java
字号:
/*
* Copyright (c) 2001 Sun Microsystems, Inc. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
* must not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
*====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see
* <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*
* $Id: ReliablePipeService.java,v 1.8 2002/06/11 17:02:12 thomas Exp $
*/
package net.jxta.impl.util;
import net.jxta.pipe.*;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.protocol.PipeAdvertisement;
import java.io.*;
import java.util.*;
import org.apache.log4j.Category;
/**
* This class provides reliable message-delivery pipes.
* Each message is guaranteed to be delivered and in-order.
*/
public class ReliablePipeService {
private final static Category LOG =
Category.getInstance(ReliablePipeService.class);
BidirectionalPipeService bidirPipeService;
SchedulerService schedulerService;
static final String RELIABLE_HEADER = "RELIABLE_HEADER";
static final int SYN = 1;
static final int SYN_ACK = 2;
static final int MSG = 3;
static final int ACK = 4;
static final int FIN1 = 5;
static final int FIN2 = 6;
static final int ESTABLISHING = 1;
static final int CONNECTED = 2;
static final int TIMED_OUT = 3;
static final int CLOSE_WAIT = 4;
static final int CLOSED = 5;
static final int MAX_RETRANSMITS = 5;
static final int MSG_TIMEOUT = 30 * 1000;
// retransmit in this many milliseconds if no ack was received
static final int RETRANSMIT_TIMEOUT = 1000;
// transmit buffered messages if this timeout has expired,
// even if we don't have enough messages to meet the window size
static final int WINDOW_FLUSH_TIMEOUT = 500;
/**
* Create a ReliablePipeService instance.
*
* @param BidirectionalPipeService The bi-directional pipe service
* to use.
*/
public ReliablePipeService (BidirectionalPipeService bidirPipeService) {
this.bidirPipeService = bidirPipeService;
this.schedulerService = new SchedulerService ();
new Thread (schedulerService).start ();
}
/**
* Connect to the peer offering the given PipeAdvertisement.
*
* @exception IOException if the connection could not be opened.
*/
public Pipe connect (PipeAdvertisement adv, int maxTimeout)
throws IOException {
LOG.debug("WATCH: REALLY Connecting...");
Timer t = new Timer();
t.start();
BidirectionalPipeService.Pipe bidirPipe =
bidirPipeService.connect (adv, maxTimeout);
LOG.debug("WATCH: got bidirPipe");
// re-calculate timeout
maxTimeout -= t.elapsed();
if (maxTimeout > 0) {
LOG.debug("WATCH: creating new Pipe");
return new Pipe (bidirPipe, maxTimeout);
} else {
throw new IOException ("Connect timed out.");
}
}
/**
* Advertise a pipe of the given name and get ready to
* accept incoming connections.
*
* @param pipeName the name of the pipe to advertise
*/
public AcceptPipe bind (String pipeName) throws IOException {
return new AcceptPipe (bidirPipeService.bind (pipeName));
}
public AcceptPipe bind(PipeAdvertisement inputPipeAdv) throws IOException {
return new AcceptPipe(bidirPipeService.bind(inputPipeAdv));
}
/**
* This is a server-like pipe that can accept incoming
* reliable bi-directional connections.
*/
public class AcceptPipe {
BidirectionalPipeService.AcceptPipe acceptPipe;
AcceptPipe (BidirectionalPipeService.AcceptPipe acceptPipe) {
this.acceptPipe = acceptPipe;
}
/**
* Get the advertisement under which this pipe known.
*/
public PipeAdvertisement getAdvertisement () {
return acceptPipe.getAdvertisement ();
}
/**
* Accept incoming reliable bi-directional connections.
*/
// right now we don't do any additional connection set-up,
// but we probably should. it'll be done here.
public Pipe accept (int maxTimeout)
throws IOException, InterruptedException {
LOG.debug("WATCH: REALLY waiting for connections....");
Timer t = new Timer();
t.start();
BidirectionalPipeService.Pipe bidirPipe =
acceptPipe.accept (maxTimeout);
LOG.debug("WATCH: got bidirPipe..." + bidirPipe);
// re-calculate timeout
maxTimeout -= t.elapsed();
LOG.debug("WATCH: maxTimeout = " + maxTimeout);
if (maxTimeout > 0) {
LOG.debug("WATCH: creating new Pipe...");
return new Pipe (bidirPipe, maxTimeout);
} else {
throw new IOException ("Connect timed out.");
}
}
}
/**
* Instances of this class are returned by ReliablePipeService.connect
* and ReliablePipeService.AcceptPipe.accept.
*/
public class Pipe implements InputPipe, OutputPipe {
IncomingMessageQueue incomingQueue;
IncomingMessageQueue incomingQueue2;
OutgoingMessageQueue outgoingQueue;
HashMap pendingRetransmits;
long outgoingSeq; // sequence counter for outgoing sequences
long highestInSeq; // highest in-sequence message received
long lastAckedSeq; // the last acknowledged message
long synSeq = -1; // the sequence number sent with SYN
InputPipe inputPipe;
OutputPipe outputPipe;
Receiver receiver;
Sender sender;
int outgoingWindow;
int state;
boolean running = false;
Object lock = new Object();
Vector listeners = new Vector();
Pipe (BidirectionalPipeService.Pipe bidirPipe, long maxTimeout)
throws IOException {
LOG.debug("WATCH: Pipe constructor");
inputPipe = bidirPipe.getInputPipe ();
outputPipe = bidirPipe.getOutputPipe ();
incomingQueue = new IncomingMessageQueue (512);
incomingQueue2 = new IncomingMessageQueue (512);
outgoingQueue = new OutgoingMessageQueue (512);
pendingRetransmits = new HashMap ();
state = ESTABLISHING;
LOG.debug("WATCH: starting receiver");
running = true;
new Thread (receiver = new Receiver ()).start ();
establish (maxTimeout);
}
/**
* Get the InputPipe on which messages are guaranteed
* to be delivered in-order.
*/
public InputPipe getInputPipe () {
return this;
}
/**
* Get the OutputPipe which guarnatees that all messages
* sent through it will be delivered in-order.
*/
public OutputPipe getOutputPipe () {
return this;
}
void establish (long maxTimeout) throws IOException {
LOG.debug("WATCH: sending SYN...");
sendMsg (SYN, synSeq, 0);
try {
while (state == ESTABLISHING)
synchronized (lock) {
lock.wait (maxTimeout);
}
} catch (InterruptedException e) {
throw new IOException ("Interrupted while establishing " +
"connection.");
}
if (state == CONNECTED) {
// start the sender
LOG.debug("WATCH: starting sender...");
new Thread (sender = new Sender ()).start ();
} else {
throw new IOException ("Connect timed out.");
}
}
/**
* Initiate connection tear-down protocol. All messages
* already buffered by send() are guaranteed to be delivered.
*/
public void close () {
if (state != CLOSE_WAIT) {
// tell sender to flush all remaining messages
state = CLOSE_WAIT;
// tell the other side to flush everything to us
try {
sendMsg (FIN1, -1, 0);
} catch (IOException e) {
}
// remaining messages will be sent asynchronously
}
}
public synchronized void send (Message msg)
throws IOException
{
switch (state) {
case ESTABLISHING:
throw new Error ("Should never be here. Theis state is " +
"only while the contructor is running.");
case CONNECTED:
LOG.debug("WATCH: enqueuing message...");
try {
outgoingQueue.enqueue (msg);
} catch (InterruptedException e) {
throw new IOException ("Thread was interrupted while in " +
"send(): " + e.getMessage ());
}
break;
case TIMED_OUT:
throw new IOException ("The peer at the other end has " +
"dropped the connection.");
case CLOSE_WAIT:
throw new IOException ("Shutdown in progress.");
case CLOSED:
throw new IOException ("Connection has closed.");
}
}
class Receiver implements Runnable {
public void run () {
Message msg;
while (state != CLOSED) {
try {
msg = inputPipe.poll (MSG_TIMEOUT);
LOG.debug("WATCH: got message " + msg);
if (!running) {
break;
} else if (msg != null) {
LOG.debug("WATCH: message not null, processing");
process (msg); }
} catch (InterruptedException e) {
break;
} catch (IOException e) {
e.printStackTrace ();
}
}
}
}
class Sender implements Runnable {
public void run () {
Message m;
LOG.debug("WATCH: sending messages on queue");
//synchronized (outgoingQueue) {
while (state != CLOSE_WAIT /* && !outgoingQueue.isEmpty () */ ) {
Timer t = new Timer ();
t.start ();
LOG.debug("WATCH: outgoingWindow = " + outgoingWindow);
int i;
for (i = 0; i < outgoingWindow; i++) {
try {
LOG.debug("WATCH: getting message off the queue, i = " + i);
LOG.debug("WATCH: elapsed time = " + t.elapsed());
long timeLeft = WINDOW_FLUSH_TIMEOUT - t.elapsed();
if(timeLeft > 0 ) {
m = outgoingQueue.dequeue(timeLeft);
} else if(!outgoingQueue.isEmpty()) {
m = outgoingQueue.dequeue(0);
} else {
break;
}
/*
m = outgoingQueue.get (i,timeLeft);
*/
if (m == null) {
LOG.debug("WATCH: No message :-(");
break;
} else {
try {
LOG.debug("WATCH: sending message " + outgoingSeq + " ping = " + m.getString("count"));
sendMsg (m, outgoingSeq++);
} catch (IOException e) {
// is taken care of automatically
// by retransmits
}
}
} catch (InterruptedException e) {
break;
}
}
// }
}
if (state == CLOSE_WAIT)
try {
sendMsg (FIN2, outgoingSeq++, 0);
} catch (IOException e) {
}
}
}
public synchronized Message waitForMessage () throws InterruptedException
{
return poll (-1);
}
public synchronized Message poll (int msgTimeout)
throws InterruptedException {
return incomingQueue.pop (msgTimeout);
}
public synchronized void addPipeMsgListener(PipeMsgListener listener) {
if(!listeners.contains(listener)) {
listeners.add(listener);
}
}
public synchronized void removePipeMsgListener(PipeMsgListener listener) {
listeners.remove(listener);
}
void notifyListeners() {
Message msg = incomingQueue.popNext();
while (msg != null) {
// notify all listeners
PipeMsgEvent pipeMsgEvent =
new PipeMsgEvent(this,msg,null);
Iterator iter = listeners.iterator();
while(iter.hasNext()) {
PipeMsgListener listener = (PipeMsgListener)iter.next();
listener.pipeMsgEvent(pipeMsgEvent);
}
msg = incomingQueue.popNext();
}
}
void process (Message msg) throws IOException {
DataInputStream header =
new DataInputStream (msg.getElement(RELIABLE_HEADER).getStream());
long seq = header.readLong ();
int type = header.readInt ();
int window = header.readInt ();
// ack the message
if (type != SYN && type != SYN_ACK && type != ACK) {
incomingQueue2.put (msg, seq);
highestInSeq =
incomingQueue2.popUntilBreakInSequence (highestInSeq);
LOG.debug("WATCH: Sending ACK " + highestInSeq);
LOG.debug("WATCH: Free Slots = " + incomingQueue.getFreeSlots ());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -