📄 routeresolver.java
字号:
return Module.START_OK; } /** * {@inheritDoc} * <p/> * Careful that stopApp() could in theory be called before startApp(). */ public void stopApp() { resolver.unregisterHandler(routerSName); // unregister SRDI resolver.unregisterSrdiHandler(routerSName); srdiIndex.stop(); membership.removePropertyChangeListener("defaultCredential", membershipCredListener); currentCredential = null; resolver = null; srdi = null; membership = null; } /** * return routeResolver usage * * @return routeResolver usage */ boolean useRouteResolver() { return useRouteResolver; } /** * enable routeResolver usage * @param enable if true, enables route resolver */ void enableRouteResolver(boolean enable) { useRouteResolver = enable; } /** * issue a new route discovery resolver request * * @param peer the destination as a logical endpoint address */ protected void findRoute(EndpointAddress peer) { RouteAdvertisement myRoute = router.getMyLocalRoute(); // No need to pursue further if we haven't initialized our own route as // responding peers are not going to be able to respond to us. if (myRoute == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Cannot issue a find route if we don\'t know our own route"); } return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Find route for peer = " + peer); } try { // create a new RouteQuery message RouteQuery doc; // check if we have some bad route information // for that peer, in that case pass the bad hop count BadRoute badRoute; badRoute = router.getBadRoute(peer); if (badRoute != null) { // ok we have a bad route // pass the bad hops info as part of the query if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("findRoute sends query: known bad Hops" + badRoute); } doc = new RouteQuery(EndpointRouter.addr2pid(peer), myRoute, badRoute.getBadHops()); } else { doc = new RouteQuery(EndpointRouter.addr2pid(peer), myRoute, null); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending query for peer : " + peer); } XMLDocument credentialDoc; CurrentCredential current = currentCredential; if (null != current) { credentialDoc = current.credentialDoc; } else { credentialDoc = null; } ResolverQuery query = new ResolverQuery(routerSName, credentialDoc, localPeerId.toString(), doc.toString(), qid.incrementAndGet()); // only run SRDI if we are a rendezvous // FIXME 20060106 bondolo This is not dynamic enough. The route // resolver needs to respond to changes in rendezvous configuration // at runtime. if (group.isRendezvous()) { // check where to send the query via SRDI List<PeerID> results; if (srdiIndex != null) { // try to find a least 10 entries, will pick up one // randomly. This will protect against retry. It is // likely that a number of RDV will know about a route results = srdiIndex.query("route", RouteAdvertisement.DEST_PID_TAG, EndpointRouter.addr2pid(peer).toString(), 10); if (results != null && !results.isEmpty()) { // use SRDI to send the query // remove any non rdv peers from the candidate list // and garbage collect the index in the process List<PeerID> clean = cleanupAnyEdges(query.getSrcPeer(), results); if (!clean.isEmpty()) { // The purpose of incrementing the hopcount // when an SRDI index match is found (we got a // pointer to a rdv that should have the route) is to // restrict any further forwarding. The increment // count is only done when a matching SRDI index is // found. Not when the replica is selected as we // still need to forward the query. This restriction // is purposelly done to avoid too many longjumps // within a walk. query.incrementHopCount(); srdi.forwardQuery(clean, query, 1); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("found an srdi entry forwarding query to SRDI peer"); } return; } } else { // it is not in our cache, look for the replica peer // we need to send the query PeerID destPeer = srdi.getReplicaPeer(EndpointRouter.addr2pid(peer).toString()); if (destPeer != null && !destPeer.equals(localPeerId)) { // don't push anywhere if we do not have a replica // or we are trying to push to ourself if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("processQuery srdiIndex DHT forward :" + destPeer); } srdi.forwardQuery(destPeer, query); return; } else { LOG.fine("processQuery srdiIndex DHT forward resulted in no op"); } } } } // if we reach that point then we just use the resolver walk resolver = group.getResolverService(); if (resolver != null) { resolver.sendQuery(null, query); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("find route query sent"); } } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("cannot get the resolver service"); } } } catch (Exception ee) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Exception in findRoute", ee); } } } /** * {@inheritDoc} * <p/> * This is called by the Generic ResolverServiceImpl when processing a * response to a query. */ public void processResponse(ResolverResponseMsg response) { if (!useRouteResolver) { // Route resolver disabled return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("processResponse got a response"); } // convert the response into a RouteResponse RouteResponse doc = null; try { Reader ip = new StringReader(response.getResponse()); XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, ip); doc = new RouteResponse(asDoc); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "malformed response - discard", e); } return; } RouteAdvertisement dstRoute = doc.getDestRoute(); RouteAdvertisement srcRoute = doc.getSrcRoute(); int queryId = response.getQueryId(); EndpointAddress routingPeer = EndpointRouter.pid2addr(srcRoute.getDestPeerID()); EndpointAddress destPeer = EndpointRouter.pid2addr(dstRoute.getDestPeerID()); // check if we have a negative route response if (queryId == NACKROUTE_QUERYID) { AccessPointAdvertisement badHop = dstRoute.nextHop(EndpointRouter.addr2pid(routingPeer)); PeerID badPeer; if (badHop != null) { badPeer = badHop.getPeerID(); } else { // the bad hop is the final destination badPeer = dstRoute.getDestPeerID(); } processBadRoute(badPeer, dstRoute); return; } // This is not our own peer adv, so we must not keep it // for more than its expiration time. // we only need to publish this route if // we don't know about it yet // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding // incoming messengers. if ((!router.isLocalRoute(EndpointRouter.pid2addr(srcRoute.getDestPeerID()))) && (!router.isRoutedRoute(srcRoute.getDestPeerID()))) { router.updateRouteAdv(srcRoute); } if (destPeer.equals(routingPeer)) { // The dest peer itself managed to respond to us. That means we // learned the route from the reverseRoute in the message // itself. So, there's nothing we need to do. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("learn route directly from the destination"); } } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("learn route:" + routingPeer); } try { // build the candidate route using the // route response from the respondant peer RouteAdvertisement candidateRoute = RouteAdvertisement.newRoute(EndpointRouter.addr2pid(destPeer), EndpointRouter.addr2pid(routingPeer),(Vector) dstRoute.getVectorHops().clone()); // cleanup the candidate route from any loop and remove the local peer extra // cycle RouteAdvertisement.cleanupLoop(candidateRoute, (PeerID) localPeerId); // Is there anything left in that route (or did the respondant // believe that we are the last hop on the route - which // obviously we are not. if (candidateRoute.size() == 0) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Route response outdated: NACK responder"); } generateNACKRoute(EndpointRouter.addr2pid(routingPeer), EndpointRouter.addr2pid(destPeer), dstRoute.getVectorHops()); return; } // get the address of the first hop in the route to verify that // we have a route (direct or long) to the first hop, so the route // is valid EndpointAddress candidateRouter = EndpointRouter.pid2addr(candidateRoute.getFirstHop().getPeerID()); // check that we have a direct connection to the first hop if (router.ensureLocalRoute(candidateRouter, null) == null) { // If we do not have a direct route to the candidate router check // for a long route in that case stich the route RouteAdvertisement routeToRouter = router.getRoute(candidateRouter, false); if (routeToRouter == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Route response useless: no route to next router hop"); } return; } // stich the route removing any loops and localPeer cycle if (RouteAdvertisement.stichRoute(candidateRoute, routeToRouter, (PeerID) localPeerId)) { router.setRoute(candidateRoute, false); } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Route response error stiching route response"); } return; } } else { // we have a direct connection with the first hop of the candidate route // set the new route, which starts with the peer that replied to us. router.setRoute(candidateRoute, false); } } catch (Exception ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failure building response route", ex); LOG.warning(" bad dstRoute: " + dstRoute.display()); } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("finish process route response successfully"); } } } /** * bad route, so let's remove everything we have so * we can start from scratch. We are maintaining a * bad route up to DEFAULT_ROUTE expiration after * that we consider it to be ok to retry the same route * We are removing both the route and peer advertisement * to force a new route query * * @param badHop source PeerID of NACK route info * @param dest original route information */ private void processBadRoute(PeerID badHop, RouteAdvertisement dest) { EndpointAddress addr = EndpointRouter.pid2addr(dest.getDestPeerID()); if (addr == null) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("remove bad route has a bad route info - discard"); } return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("remove bad route info for dest " + dest.display()); if (badHop != null) { LOG.fine("remove bad route bad hop " + badHop); } } try { // check first that we still have the same route, we may already // using a new route RouteAdvertisement currentRoute = router.getRoute(addr, false);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -