📄 endpointrouter.java
字号:
} } else { // we get a new route badRoutes.remove(pidAddr); } // Check if the route makes senses (loop detection) if (!checkRoute(r)) { // Route is invalid. Drop it. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Route is invalid"); } return false; } // check if we can reach the first hop in the route // We only do a shallow test of the first hop. Whether more effort // is worth doing or not is decided (and done) by the invoker. if (!isLocalRoute(pid2addr(r.getFirstHop().getPeerID()))) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Unreachable route - ignore"); } return false; } } catch (Exception ez1) { // The vector must be empty, which is not supposed // to happen. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Got an empty route - discard" + r.display()); } return false; } // add the new route try { // push the route to SRDI only if it is a new route. the intent is // to minimize SRDI traffic. The SRDIinformation is more of the order // this peer has a route to this destination, it does not need to be // updated verey time the route is updated. Information about knowing // that this peer has a route is more important that the precise // route information // SRDI is run only if the peer is acting as a rendezvous if (group.isRendezvous()) { if (!routedRoutes.containsKey(pidAddr)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("push new SRDI route " + pid); } pushNeeded = true; } } // new route so publish the known route in our cache if (!routedRoutes.containsKey(pidAddr)) { routeCM.createRoute(r); newDestinations.add(pidAddr); } // Remove any endpoint addresses from the route // as part of the cloning. We just keep track // of PIDs in our route table RouteAdvertisement newRoute = (RouteAdvertisement) r.cloneOnlyPIDs(); routedRoutes.put(pidAddr, newRoute); // We can get rid of any negative info we had. We have // a new and different route. badRoutes.remove(pidAddr); notifyAll(); // Wakeup those waiting for a route. status = true; } catch (Exception e2) { // We failed, leave things as they are. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug(" failed setting route with " + e2); } status = false; } } // due to the potential high latency of making the // srdi revolver push we don't want to hold the lock // on the EndpointRouter object as we may have to // discover a new route to a rendezvous if (pushNeeded && status) { // we are pushing the SRDI entry to a replica peer routeResolver.pushSrdi(null, pid); } return status; } /** * This method is used to remove a route * * @param pId route to peerid to be removed */ protected void removeRoute(EndpointAddress pId) { boolean needRemove; synchronized (this) { needRemove = false; if (routedRoutes.containsKey(pId)) { if (group.isRendezvous()) { // Remove the SRDI cache entry from the SRDI cache needRemove = true; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("remove SRDI route " + pId); } } routedRoutes.remove(pId); } } // due to the potential high latency of pushing // the SRDI message we don't want to hold the EndpointRouter // object lock if (needRemove) { // We are trying to flush it from the replica peer // Note: this is not guarantee to work if the peerview // is out of sync. The SRDI index will be locally // repaired as the peerview converge routeResolver.removeSrdi(null, addr2pid(pId)); } } /** * {@inheritDoc} */ public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) { EndpointAddress srcPeer = null; // The originating peer EndpointAddress destPeer = null; // The destination peer EndpointAddress lastHop = null; // The last peer that routed this to us boolean connectLastHop = false; ; // true:Try connecting to lastHop EndpointAddress origSrcAddr = null; // The origin endpointAddr (jxta:) EndpointAddress origDstAddr = null; // The dest endpointAddr (jxta:) Vector origHops = null; // original route of the message EndpointRouterMessage routerMsg = null; EndpointAddress nextHop = null; RouteAdvertisement radv = null; // We do not want the existing header to be ignored of course. routerMsg = new EndpointRouterMessage(msg, false); if (!routerMsg.msgExists()) { // The sender did not use this router if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("No routing info for " + msg ); } return; } try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug(routerMsg.display()); } origSrcAddr = routerMsg.getSrcAddress(); origDstAddr = routerMsg.getDestAddress(); // convert the src and dest addresses into canonical // form stripping service info srcPeer = new EndpointAddress(origSrcAddr, null, null); destPeer = new EndpointAddress(origDstAddr, null, null); if (routerMsg.getLastHop() != null) { lastHop = new EndpointAddress(routerMsg.getLastHop()); } // See if there's an originator full route adv inthere. // That's a good thing to keep. radv = routerMsg.getRouteAdv(); if (radv != null) { // publish the full route adv. Also, leave it the // message. It turns out to be extremely usefull to // peers downstream, specially the destination. If // this here peer wants to embed his own radv, it will // have to wait; the one in the message may not come // again. // FIXME - jice@jxta.org 20040413 : all this could wait (couldn't it ?) // until we know it's needed, therefore parsing could wait as well. // Looks like a safe bet to try and ensure a // connection in the opposite direction if there // isn't one already. if (pid2addr(radv.getDestPeerID()).equals(lastHop)) { connectLastHop = true; } // Make the most of that new adv. setRoute(radv, true); updateRouteAdv(radv); } } catch (Exception badHdr) { // Drop it, we do not even know the destination if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Bad routing header or bad message. Dropping " + msg ); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Exception: ", badHdr); } return; } // Is this a loopback ? if ((srcPeer != null) && srcPeer.equals(localPeerAddr)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processIncomingMessage: dropped loopback"); } return; } // Are we already sending to ourself. This may occur // if some old advertisements for our EA is still // floating around if ((lastHop != null) && lastHop.equals(localPeerAddr)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processIncomingMessage: dropped loopback from impersonating Peer"); } return; } // We have to try and reciprocate the connection, so that we // have chance to learn reverse routes early enough. If we do // not already have a messenger, then we must know a route adv // for that peer in order to be able to connect. Otherwise, // the attempt will fail and we'll be left with a negative // entry without having realy tried anything. To prevent that // we rely on the presence of a radv in the router message. If // there's no radv, two possibilities: // // - It is not the first contact from that peer and we already // have tried (with or without success) to reciprocate. // // - It is the first contact from that peer but it has not // embedded its radv. In the most likely case (an edge peer // connecting to a rdv), the edge peer will have no difficulty // finding the reverse route, provided that we do not make a // failed attempt right now. // // Conclusion: if there's no embedded radv in the message, do // nothing. if (connectLastHop) { ensureLocalRoute(lastHop, radv); } try { // Normalize the reverseHops vector from the message and, if it // looks well formed and usefull, learn from it. Do we have a // direct route to the origin ? If yes, we'll zap the revers // route in the message: we're a much better router. else, // learn from it. As a principle we regard given routes to be // better than existing ones. Vector reverseHops = routerMsg.getReverseHops(); if (reverseHops == null) { reverseHops = new Vector(); } // check if we do not have a direct route // in that case we don't care to learn thelong route if (!isLocalRoute(srcPeer)) { // Check if the reverseRoute info looks correct. if (lastHop != null) { // since we are putting the lasthop in the // reverse route to indicate the validity of // lastop to reach the previous hop, we have // the complete route, just clone it. (newRoute // owns the given vector) if ((reverseHops.size() > 0) && ((AccessPointAdvertisement) reverseHops.firstElement()).getPeerID().equals(addr2pid(lastHop))) { // Looks like a good route to learn setRoute(RouteAdvertisement.newRoute(addr2pid(srcPeer), (Vector) reverseHops.clone()), true); } } } // If this peer is the final destination, then we're done. Just let // it be pushed up the stack. We must not leave our header; // it is now meaningless and none of the upper layers business. // All we have to do is to supply the end-2-end src and dest // so that the endpoint demux routine can do its job. if (destPeer.equals(localPeerAddr)) { // Removing the header. routerMsg.clearAll(); routerMsg.updateMessage(); // receive locally endpoint.processIncomingMessage(msg, origSrcAddr, origDstAddr); return; } // WATCHOUT: if this peer is part of the reverse route // it means that we've seen that message already: there's // a loop between routers ! If that happens drop that // message as if it was burning our fingers // First build the ap that we might add to the reverse route. // We need it to look for ourselves in reverseHops. (contains // uses equals(). equals will work because we always include // in reversehops aps that have only a pid. AccessPointAdvertisement selfAp = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType()); selfAp.setPeerID(addr2pid(localPeerAddr)); if (reverseHops.contains(selfAp)) { // Danger, bail out ! // Better not to try to NACK for now, but get rid of our own // route. If we're sollicited again, there won't be a loop // and we may NACK. if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Routing loop detected. Message dropped"); } removeRoute(destPeer); return; } // Upda
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -