📄 endpointrouter.java
字号:
try {
routedRoutes.put(r.dest, r);
notifyAll(); // Wakeup those waiting for a route.
return true;
} catch (Exception e2) {
// We failed, leave things as they are.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" failed with " + e2);
}
return false;
}
public synchronized void removeRoute(String pId) {
try {
routedRoutes.remove(pId);
} catch (Exception e) {
}
}
/**
* @param msg The message which has just been recieved.
* @param srcAddr The address from which this message was received. This
* is addr is only one hop away.
* @param dstAddr The destination for this message. Should be one of this
* peer's addresses.
*
* @return The message, which should be delivered to a service on this peer.
* null if the message has been routed onward, or if the
* message should be dropped.
*/
public void processIncomingMessage( Message msg,
EndpointAddress srcAddr,
EndpointAddress dstAddr ) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("processIncomingMessage starts");
InputStream ip = null;
String srcPeer = null; // The originating peer
String destPeer = null; // The destination peer
String lastHop = null; // The last peer that routed this to us
EndpointAddress origSrcAddr = null; // The origin endpointAddr (jxta:)
EndpointAddress origDstAddr = null; // The dest endpointAddr (jxta:)
int nbOfHops = 0; // Nb of hops for the reverse route.
EndpointRouterMessage routerMsg = null;
MessageElement routerElement = msg.getElement(EndpointRouterMessage.Name);
try {
if (routerElement == null) {
// The sender did not use this router
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("processIncomingMessage: no routing info");
endpoint.demux(msg);
return;
}
ip = routerElement.getStream();
if (ip == null) {
// The sender did not use this router
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("processIncomingMessage: invalid routing info");
endpoint.demux(msg);
return;
}
routerMsg = new EndpointRouterMessage(ip);
try {
nbOfHops = Integer.parseInt(routerMsg.getNbOfHops());
} catch (Exception ez1) {
nbOfHops = 0;
}
origSrcAddr =
endpoint.newEndpointAddress(routerMsg.getSrcAddress());
origDstAddr =
endpoint.newEndpointAddress(routerMsg.getDestAddress());
srcPeer = origSrcAddr.getProtocolName() + "://"
+ origSrcAddr.getProtocolAddress();
destPeer = origDstAddr.getProtocolName() + "://"
+ origDstAddr.getProtocolAddress();
lastHop = routerMsg.getLastHop();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug(
"Process Incoming : " +
"\n\tsrcPeer= " + srcPeer +
"\n\tdestPeer= " + destPeer +
"\n\tlastHop= " + ((null != lastHop) ? lastHop : "null") );
}
Vector g = routerMsg.getForwardGateways();
if (g != null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" Forward Gateways:");
for (int i = 0; i < g.size(); ++i) {
try {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" [" + i + "] " + (String) g.elementAt(i));
} catch (Exception ez1) {
break;
}
}
}
g = routerMsg.getReverseGateways();
if (g != null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" Reverse Gateways:");
for (int i = 0; i < g.size(); ++i) {
try {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" [" + i + "] " + (String) g.elementAt(i));
} catch (Exception ez1) {
break;
}
}
} else if ((nbOfHops >= 1) && (lastHop != null)) {
// This is the case of a peer that has not filled up the optional reverse gateway list.
// Create one.
Vector tmp = new Vector(1);
tmp.add(lastHop);
routerMsg.setReverseGateways(tmp);
}
} catch (Exception badHdr) {
// Drop it, we do not even know the destination
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Bad routing header or bad message. Message dropped.");
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("exception: " + badHdr);
return;
}
// Is this a loopback ?
if ((srcPeer != null) && srcPeer.equals(localPeerAddr)) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processIncomingMessage: dropped loopback");
return;
}
try {
// Update routing information about that peer.
// Do we already know about that peer ?
if (lastHop != null) {
// We're pretty sure we can talk to this peer directly
// Remove the entry from the negative cache.
triedAndFailed.remove(lastHop);
// NB: learnLocalRoute takes care of the nity-gritty
learnLocalRoute(lastHop);
}
} catch(Exception whatever) {
// If the above failed, we may still be able to route the message.
whatever.printStackTrace();
}
try {
// If this peer is the final destination, then we're done. Just let
// it be pushed up the stack. We must not put back our header;
// it is now meaningless and none of the upper layers business.
// All we have to do is to restore the end-2-end src and dest
// so that the endpoint demux routine can do its job.
if (destPeer.equals(localPeerAddr)) {
// Since the destination is the local peer, it might be a good
// optimization to learn the reverse route now, if there is one
// Now see if the previous hop has a reverse route to the source peer.
if (nbOfHops != 0) {
setRoute(new Route(srcPeer,
lastHop,
nbOfHops,
routerMsg.getReverseGateways()));
}
// Clear the forward gateway of the message since the message
// has been delivered.
routerMsg.setForwardGateways(null);
// Removing the header.
msg.removeElement(EndpointRouterMessage.Name);
// Adding the updated header.
msg.addElement(
msg.newMessageElement(EndpointRouterMessage.Name,
null,
routerMsg.getInputStream()));
msg.setSourceAddress(origSrcAddr);
msg.setDestinationAddress(origDstAddr);
endpoint.demux(msg);
return;
}
boolean headerIsChanged = false;
// Get the next peer into the forward route
String nextHop = null;
if (routerMsg.getForwardGateways() != null) {
nextHop = getNextHop(routerMsg.getForwardGateways());
}
if (nextHop == null) {
// No next hop. Try to use destPeer instead.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("No next hop in forward gateways - Try to use destination as next hop");
nextHop = destPeer;
// Clear the forward path which is probably wrong
routerMsg.setForwardGateways(null);
// Do we have a reverse route for the source ?
Route reverseRoute = getRoute(destPeer);
if (reverseRoute != null) {
routerMsg.setReverseGateways(reverseRoute.gateways);
}
headerIsChanged = true;
}
if ((isLocalRoute(destPeer)) && (!nextHop.equals(destPeer))) {
// There is a local route - use it
// Clear the forward path which is probably wrong
// Do we have a reverse route for the source ?
Route reverseRoute = getRoute(destPeer);
if (reverseRoute != null) {
routerMsg.setReverseGateways(reverseRoute.gateways);
}
headerIsChanged = true;
nextHop = destPeer;
}
if (headerIsChanged) {
// Change the router message within the message
// because we want to modify it.
// Removing the header.
// Removing the header.
msg.removeElement(EndpointRouterMessage.Name);
// Adding the updated header.
msg.addElement(
msg.newMessageElement(EndpointRouterMessage.Name,
null,
routerMsg.getInputStream()));
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("forwarding to " + nextHop);
EndpointMessenger messenger = getTransportMessenger(nextHop, true);
// Build a disposable RouterMessenger around it that will add our
// header.
EndpointMessenger routerMessenger = new RouterMessenger(messenger,
origDstAddr,
endpoint,
this,
false);
routerMessenger.sendMessage(msg);
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Message has been forwarded to the next hop");
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("failed to deliver or forward: " + e);
}
}
// Get the next hop within a vector of gateways
private String getNextHop(Vector v) {
if ((v == null) || (v.size() == 0)) {
// Empty vector.
return null;
}
// Find the local peer within the vector
String nextHop = null;
int i = v.indexOf(localPeerAddr);
if (i == -1) {
// The local peer is not into the vector. Since we have got that
// message, the best we can do is to send it to the first gateway
// in the forward path.
try {
nextHop = (String) v.elementAt(0);
} catch (Exception ez1) {
// Should not fail, but if it does, there is not much we can do
return null;
}
return nextHop;
}
// Found the local peer within the vector of gateways. Get the next gateway
try {
nextHop = (String) v.elementAt(i + 1);
} catch (Exception ez1) {
// There is no next hop
return null;
}
return nextHop;
}
private Hashtable triedAndFailed = new Hashtable();
public EndpointAddress checkPeer(String peer) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("checkPeer peer= " + peer);
// CheckPeer is realy lazy because what it does is expensive.
// When needed, the negative info that prevents its from working
// too much is removed. (see the calls to learnLocalRoute).
Long lastTry = (Long) triedAndFailed.get(peer);
if (lastTry != null) {
long diff = System.currentTimeMillis() - lastTry.longValue();
if (diff > 0 && diff < 300000) {
// Too early to retry (less than 5 minutes).
return null;
} else {
// It is ancient knowlege remove it.
triedAndFailed.remove(peer);
}
}
// Never tried or it was a long time ago.
// Get (locally) the advertisement of this peer
Enumeration advs = null;
PeerAdvertisement adv = null;
try {
advs = getPeerAdv(peer);
if (advs == null || !advs.hasMoreElements()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no advertisement");
// Can't keep negative info in that case; the absence of adv
// is too volatile. We want to take advantage of one as soon
// as it appears; notably during startup.
return null;
}
while (advs.hasMoreElements()) {
adv = (PeerAdvertisement) advs.nextElement();
// Get its EndpointService advertisement
TextElement endpParam = (TextElement)
adv.getServiceParam(PeerGroup.endpointClassID);
if (endpParam == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("checkPeer: no Endpoint Params");
continue;
}
Enumeration endps = endpParam.getChildren("Addr");
if (!endps.hasMoreElements()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("checkPeer: no Addresses in adv");
continue;
}
Vector addrs = new Vector();
while (endps.hasMoreElements()) {
String saddr = "";
try {
saddr =
((TextElement) endps.nextElement()).getTextValue();
addrs.addElement(endpoint.newEndpointAddress(saddr));
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("bad address");
continue;
}
}
if (addrs.size() > 0) {
EndpointAddress bestAddr = null;
bestAddr = getBestLocalRoute(addrs.elements());
if (bestAddr != null) {
// Found a direct route. Return it.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("found direct route");
return bestAddr;
}
}
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" failed with " + e);
triedAndFailed.put(peer,
new Long(System.currentTimeMillis()));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -