📄 endpointrouter.java
字号:
return false;
}
r.display();
// Here we're synchronized. Check that we do not have that route
// after all.
if (directRoutes.containsKey(r.dest)) return false; // nothing new.
// Check if the route makes senses (loop detection)
if (!checkRoute (r)) {
// Route is invalid. Drop it.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug ("Route is invalid");
return false;
}
if ( !directRoutes.containsKey (r.router)) {
// We don't know how to reach this route. Ignore.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug ("Unreachable route - ignore");
return false;
}
// If the last hop is not in the vector of gateways, add it.
try {
String lastInVector = (String) r.gateways.elementAt (0);
if ((lastInVector != null) && !lastInVector.equals (r.router)) {
//PDA requirement 18.02.2002
// Vector.add -> Vector.insertElementAt
// r.gateways.add (0, r.router);
r.gateways.insertElementAt (r.router, 0);
}
} catch (Exception ez1) {
// The vector must be empty, which is not supposed
// to happen.
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn ("Got an empty route - discard");
return false;
}
try {
routedRoutes.put(r.dest, r);
notifyAll(); // Wakeup those waiting for a route.
return true;
} catch (Exception e2) {
// We failed, leave things as they are.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" failed with " + e2);
}
return false;
}
public synchronized void removeRoute(String pId) {
try {
routedRoutes.remove(pId);
} catch (Exception e) {
}
}
/**
* @param msg The message which has just been recieved.
* @param srcAddr The address from which this message was received. This
* is addr is only one hop away.
* @param dstAddr The destination for this message. Should be one of this
* peer's addresses.
*
* @return The message, which should be delivered to a service on this peer.
* null if the message has been routed onward, or if the
* message should be dropped.
*/
public void processIncomingMessage( Message msg,
EndpointAddress srcAddr,
EndpointAddress dstAddr ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processIncomingMessage starts");
InputStream ip = null;
String srcPeer = null; // The originating peer
String destPeer = null; // The destination peer
String lastHop = null; // The last peer that routed this to us
EndpointAddress origSrcAddr = null; // The origin endpointAddr (jxta:)
EndpointAddress origDstAddr = null; // The dest endpointAddr (jxta:)
int nbOfHops = 0; // Nb of hops for the reverse route.
EndpointRouterMessage routerMsg = null;
MessageElement routerElement = msg.getElement(EndpointRouterMessage.Name);
try {
if (routerElement == null) {
// The sender did not use this router
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processIncomingMessage: no routing info");
endpoint.demux(msg);
return;
}
ip = routerElement.getStream();
if (ip == null) {
// The sender did not use this router
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processIncomingMessage: invalid routing info");
endpoint.demux(msg);
return;
}
routerMsg = new EndpointRouterMessage(ip);
try {
nbOfHops = Integer.parseInt (routerMsg.getNbOfHops());
} catch (Exception ez1) {
nbOfHops = 0;
}
origSrcAddr =
endpoint.newEndpointAddress(routerMsg.getSrcAddress());
origDstAddr =
endpoint.newEndpointAddress(routerMsg.getDestAddress());
srcPeer = origSrcAddr.getProtocolName() + "://"
+ origSrcAddr.getProtocolAddress();
destPeer = origDstAddr.getProtocolName() + "://"
+ origDstAddr.getProtocolAddress();
lastHop = routerMsg.getLastHop();
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" srcPeer= " + srcPeer);
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" destPeer= " + destPeer);
if (lastHop != null) if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" lastHop= " + lastHop);
Vector g = routerMsg.getForwardGateways();
if (g != null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug (" Forward Gateways:");
for (int i = 0; i < g.size(); ++i) {
try {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug (" [" + i + "] " + (String) g.elementAt(i));
} catch (Exception ez1) {
break;
}
}
}
g = routerMsg.getReverseGateways();
if (g != null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug (" Reverse Gateways:");
for (int i = 0; i < g.size(); ++i) {
try {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug (" [" + i + "] " + (String) g.elementAt(i));
} catch (Exception ez1) {
break;
}
}
} else if ((nbOfHops >= 1) && (lastHop != null)) {
// This is the case of a peer that has not filled up the optional reverse gateway list.
// Create one.
Vector tmp = new Vector(1);
//PDA requirement 18.02.2002
// Vector.add -> Vector.addElement
// tmp.add (lastHop);
tmp.addElement (lastHop);
routerMsg.setReverseGateways (tmp);
}
} catch (Exception badHdr) {
// Drop it, we do not even know the destination
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Bad routing header or bad message. Message dropped.");
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("exception: " + badHdr);
return;
}
// Is this a loopback ?
if ((srcPeer != null) && srcPeer.equals(localPeerAddr)) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processIncomingMessage: dropped loopback");
return;
}
try {
// Update routing information about that peer.
// Do we already know about that peer ?
if (lastHop != null) {
// We're pretty sure we can talk to this peer directly
// Remove the entry from the negative cache.
triedAndFailed.remove(lastHop);
// NB: learnLocalRoute takes care of the nity-gritty
learnLocalRoute(lastHop);
}
} catch(Exception whatever) {
// If the above failed, we may still be able to route the message.
whatever.printStackTrace();
}
try {
// If this peer is the final destination, then we're done. Just let
// it be pushed up the stack. We must not put back our header;
// it is now meaningless and none of the upper layers business.
// All we have to do is to restore the end-2-end src and dest
// so that the endpoint demux routine can do its job.
if (destPeer.equals(localPeerAddr)) {
// Since the destination is the local peer, it might be a good
// optimization to learn the reverse route now, if there is one
// Now see if the previous hop has a reverse route to the source peer.
if (nbOfHops != 0) {
setRoute (new Route (srcPeer,
lastHop,
nbOfHops,
routerMsg.getReverseGateways()));
}
// Clear the forward gateway of the message since the message
// has been delivered.
routerMsg.setForwardGateways (null);
// Removing the header.
msg.removeElement (EndpointRouterMessage.Name);
// Adding the updated header.
msg.addElement(
msg.newMessageElement (EndpointRouterMessage.Name,
null,
routerMsg.getInputStream()));
msg.setSourceAddress(origSrcAddr);
msg.setDestinationAddress(origDstAddr);
endpoint.demux(msg);
return;
}
boolean headerIsChanged = false;
// Get the next peer into the forward route
String nextHop = null;
if (routerMsg.getForwardGateways() != null) {
nextHop = getNextHop (routerMsg.getForwardGateways());
}
if (nextHop == null) {
// No next hop. Try to use destPeer instead.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug ("No next hop in forward gateways - Try to use destination as next hop");
nextHop = destPeer;
// Clear the forward path which is probably wrong
routerMsg.setForwardGateways (null);
// Do we have a reverse route for the source ?
Route reverseRoute = getRoute (destPeer);
if (reverseRoute != null) {
routerMsg.setReverseGateways (reverseRoute.gateways);
}
headerIsChanged = true;
}
if ((isLocalRoute (destPeer)) && (!nextHop.equals (destPeer))) {
// There is a local route - use it
// Clear the forward path which is probably wrong
// Do we have a reverse route for the source ?
Route reverseRoute = getRoute (destPeer);
if (reverseRoute != null) {
routerMsg.setReverseGateways (reverseRoute.gateways);
}
headerIsChanged = true;
nextHop = destPeer;
}
if (headerIsChanged) {
// Change the router message within the message
// because we want to modify it.
// Removing the header.
// Removing the header.
msg.removeElement (EndpointRouterMessage.Name);
// Adding the updated header.
msg.addElement(
msg.newMessageElement (EndpointRouterMessage.Name,
null,
routerMsg.getInputStream()));
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("forwarding to " + nextHop);
EndpointMessenger messenger = getTransportMessenger(nextHop, true);
// Build a RouterMessenger around it that will add our header.
EndpointMessenger routerMessenger = new RouterMessenger(messenger,
origDstAddr,
endpoint,
this);
routerMessenger.sendMessage(msg);
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug ("Message has been forwarded to the next hop");
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("failed to deliver or forward: " + e);
}
}
// Get the next hop within a vector of gateways
private String getNextHop (Vector v) {
if ((v == null) || (v.size() == 0)) {
// Empty vector.
return null;
}
// Find the local peer within the vector
String nextHop = null;
int i = v.indexOf (localPeerAddr);
if (i == -1) {
// The local peer is not into the vector. Since we have got that
// message, the best we can do is to send it to the first gateway
// in the forward path.
try {
nextHop = (String) v.elementAt (0);
} catch (Exception ez1) {
// Should not fail, but if it does, there is not much we can do
return null;
}
return nextHop;
}
// Found the local peer within the vector of gateways. Get the next gateway
try {
nextHop = (String) v.elementAt (i + 1);
} catch (Exception ez1) {
// There is no next hop
return null;
}
return nextHop;
}
private Hashtable triedAndFailed = new Hashtable();
public EndpointAddress checkPeer(String peer) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("checkPeer peer= " + peer);
// CheckPeer is realy lazy because what it does is expensive.
// When needed, the negative info that prevents its from working
// too much is removed. (see the calls to learnLocalRoute).
Long lastTry = (Long) triedAndFailed.get(peer);
if (lastTry != null) {
long diff = System.currentTimeMillis() - lastTry.longValue();
if (diff > 0 && diff < 300000) {
// Too early to retry (less than 5 minutes).
return null;
} else {
// It is ancient knowlege remove it.
triedAndFailed.remove(peer);
}
}
// Never tried or it was a long time ago.
// Get (locally) the advertisement of this peer
Enumeration advs = null;
PeerAdvertisement adv = null;
try {
advs = getPeerAdv(peer);
if (advs == null || !advs.hasMoreElements()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no advertisement");
// Can't keep negative info in that case; the absence of adv
// is too volatile. We want to take advantage of one as soon
// as it appears; notably during startup.
return null;
}
while (advs.hasMoreElements()) {
adv = (PeerAdvertisement) advs.nextElement();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -