📄 endpointrouter.java
字号:
LOG.fine("route is ok"); } return true; } } // Adds a new long route provided there not a direct one already. // Replaces any longer route. return true if the route was truely new. // The whole deal must be synch. We do not want to add a long route // while a direct one is being added in parallell or other stupid things like that. /** * set new route info * * @param route new route to learn * @param force true if the route was obtained by receiving * a message * @return true if route was truly new */ boolean setRoute(RouteAdvertisement route, boolean force) { PeerID peerID; EndpointAddress peerAddress; boolean pushNeeded = false; boolean status; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("setRoute:"); } if (route == null) { return false; } synchronized (this) { try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(route.display()); } peerID = route.getDest().getPeerID(); peerAddress = pid2addr(peerID); // Check if we are in the case where we are // setting a new route as we received a message // always force the new route setup when we received a // a message if (!force) { // check if we have some bad NACK route info for // this destination BadRoute badRoute = badRoutes.get(peerAddress); if (badRoute != null) { Long nextTry = badRoute.getExpiration(); if (nextTry > System.currentTimeMillis()) { // check if the route we have in the NACK cache match the // new one. Need to make sure that we clean the route // from any endpoint addresses as the badRoute cache only // contains PeerIDs RouteAdvertisement routeClean = route.cloneOnlyPIDs(); if (routeClean.equals(badRoute.getRoute())) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("try to use a known bad route"); } return false; } } else { // expired info, just flush NACK route cache badRoutes.remove(peerAddress); } } } else { // we get a new route badRoutes.remove(peerAddress); } // Check if the route makes senses (loop detection) if (!checkRoute(route)) { // Route is invalid. Drop it. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Route is invalid"); } return false; } // check if we can reach the first hop in the route // We only do a shallow test of the first hop. Whether more effort // is worth doing or not is decided (and done) by the invoker. if (!isLocalRoute(pid2addr(route.getFirstHop().getPeerID()))) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Unreachable route - ignore"); } return false; } } catch (Exception ez1) { // The vector must be empty, which is not supposed to happen. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Got an empty route - discard" + route.display()); } return false; } // add the new route try { // push the route to SRDI only if it is a new route. the intent is // to minimize SRDI traffic. The SRDIinformation is more of the order // this peer has a route to this destination, it does not need to be // updated verey time the route is updated. Information about knowing // that this peer has a route is more important that the precise // route information // SRDI is run only if the peer is acting as a rendezvous if (group.isRendezvous()) { if (!routedRoutes.containsKey(peerID)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("push new SRDI route " + peerID); } pushNeeded = true; } } // new route so publish the known route in our cache if (!routedRoutes.containsKey(peerID)) { routeCM.createRoute(route); newDestinations.add(peerAddress); } // Remove any endpoint addresses from the route // as part of the cloning. We just keep track // of PIDs in our route table RouteAdvertisement newRoute = route.cloneOnlyPIDs(); routedRoutes.put(peerID, newRoute); // We can get rid of any negative info we had. We have // a new and different route. badRoutes.remove(peerAddress); notifyAll(); // Wakeup those waiting for a route. status = true; } catch (Exception e2) { // We failed, leave things as they are. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(" failed setting route with " + e2); } status = false; } } // due to the potential high latency of making the // srdi revolver push we don't want to hold the lock // on the EndpointRouter object as we may have to // discover a new route to a rendezvous if (pushNeeded && status) { // we are pushing the SRDI entry to a replica peer routeResolver.pushSrdi(null, peerID); } return status; } /** * This method is used to remove a route * * @param peerID route to peerid to be removed */ void removeRoute(PeerID peerID) { boolean needRemove; synchronized (this) { needRemove = false; if (routedRoutes.containsKey(peerID)) { if (group.isRendezvous()) { // Remove the SRDI cache entry from the SRDI cache needRemove = true; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("remove SRDI route " + peerID); } } routedRoutes.remove(peerID); } } // due to the potential high latency of pushing // the SRDI message we don't want to hold the EndpointRouter // object lock if (needRemove) { // We are trying to flush it from the replica peer // Note: this is not guarantee to work if the peerview // is out of sync. The SRDI index will be locally // repaired as the peerview converge routeResolver.removeSrdi(null, peerID); } } /** * {@inheritDoc} */ public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) { EndpointAddress srcPeerAddress; EndpointAddress destPeer; EndpointAddress lastHop = null; boolean connectLastHop = false; EndpointAddress origSrcAddr; EndpointAddress origDstAddr; Vector origHops = null; // original route of the message EndpointRouterMessage routerMsg; EndpointAddress nextHop = null; RouteAdvertisement radv; // We do not want the existing header to be ignored of course. routerMsg = new EndpointRouterMessage(msg, false); if (!routerMsg.msgExists()) { // The sender did not use this router if (Logging.SHOW_FINE && LOG.isLoggable(Level.WARNING)) { LOG.warning("Discarding " + msg + ". No routing info."); } return; } try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(routerMsg.display()); } origSrcAddr = routerMsg.getSrcAddress(); origDstAddr = routerMsg.getDestAddress(); // convert the src and dest addresses into canonical // form stripping service info srcPeerAddress = new EndpointAddress(origSrcAddr, null, null); destPeer = new EndpointAddress(origDstAddr, null, null); lastHop = routerMsg.getLastHop(); // See if there's an originator full route adv inthere. // That's a good thing to keep. radv = routerMsg.getRouteAdv(); if (radv != null) { // publish the full route adv. Also, leave it the // message. It turns out to be extremely usefull to // peers downstream, specially the destination. If // this here peer wants to embed his own radv, it will // have to wait; the one in the message may not come // again. // FIXME - jice@jxta.org 20040413 : all this could wait (couldn't it ?) // until we know it's needed, therefore parsing could wait as well. // Looks like a safe bet to try and ensure a // connection in the opposite direction if there // isn't one already. if (pid2addr(radv.getDestPeerID()).equals(lastHop)) { connectLastHop = true; } // Make the most of that new adv. setRoute(radv, true); updateRouteAdv(radv); } } catch (Exception badHdr) { // Drop it, we do not even know the destination if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Bad routing header or bad message. Dropping " + msg); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Exception: ", badHdr); } return; } // Is this a loopback ? if ((srcPeerAddress != null) && srcPeerAddress.equals(localPeerAddr)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("dropped loopback"); } return; } // Are we already sending to ourself. This may occur // if some old advertisements for our EA is still // floating around if ((lastHop != null) && lastHop.equals(localPeerAddr)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("dropped loopback from impersonating Peer"); } return; } // We have to try and reciprocate the connection, so that we // have chance to learn reverse routes early enough. If we do // not already have a messenger, then we must know a route adv // for that peer in order to be able to connect. Otherwise, // the attempt will fail and we'll be left with a negative // entry without having realy tried anything. To prevent that // we rely on the presence of a radv in the router message. If // there's no radv, two possibilities: // // - It is not the first contact from that peer and we already // have tried (with or without success) to reciprocate. // // - It is the first contact from that peer but it has not // embedded its radv. In the most likely case (an edge peer // connecting to a rdv), the edge peer will have no difficulty // finding the reverse route, provided that we do not make a // failed attempt right now. // // Conclusion: if there's no embedded radv in the message, do // nothing. if (connectLastHop) { ensureLocalRoute(lastHop, radv); } try { // Normalize the reverseHops vector from the message and, if it // looks well formed and usefull, learn from it. Do we have a // direct route to the origin ? If yes, we'll zap the revers // route in the message: we're a much better router. else, // learn from it. As a principle we regard given routes to be // better than existing ones. Vector<AccessPointAdvertisement> reverseHops = routerMsg.getReverseHops(); if (reverseHops == null) { reverseHops = new Vector<AccessPointAdvertisement>(); } // check if we do not have a direct route // in that case we don't care to learn thelong route if
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -