📄 routeresolver.java
字号:
return 0; } /** * {@inheritDoc} * * <p/>Carefull that stopApp() could in theory be called before startApp(). */ public void stopApp() { resolver.unregisterHandler(routerSName); // unregister SRDI resolver.unregisterSrdiHandler(routerSName); srdiIndex.stop(); membership.removePropertyChangeListener("defaultCredential", membershipCredListener); membershipCredListener = null; credential = null; credentialDoc = null; resolver = null; srdi = null; membership = null; } /** * issue a new route discovery resolver request * * @param peer the destination as a logical enpoint address */ protected void findRoute(EndpointAddress peer) { RouteAdvertisement myRoute = router.getMyLocalRoute(); // No need to pursue further if we haven't // initialize our own route as responding // peers are not going to be able to respond to us. if (myRoute == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Cannot issue a find route if we don't know our own route"); } return; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Find route for peer = " + peer); } try { // create a new RouteQuery message RouteQuery doc = null; // check if we have some bad route information // for that peer, in that case pass the bad hop count BadRoute badRoute; badRoute = (BadRoute) router.getBadRoute(peer); if (badRoute != null) { // ok we have a bad route // pass the bad hops info as part of the query if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("findRoute sends query: known bad Hops" + badRoute.display()); } doc = new RouteQuery(router.addr2pid(peer), myRoute, badRoute.getHops()); } else { doc = new RouteQuery(router.addr2pid(peer), myRoute, null); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending query for peer : " + peer); } ResolverQuery query = new ResolverQuery(routerSName, credentialDoc, localPeerId.toString(), doc.toString(), qid++); // only run SRDI if we are a rendezvous if (group.isRendezvous()) { // check where to send the query via SRDI Vector results = null; if (srdiIndex != null) { // try to find a least 10 entries, will pick up one // randomly. This will protect against retry. It is // likely that a number of RDV will know about a route results = srdiIndex.query("route", "DstPID", router.addr2pid(peer).toString(), 10); if (results != null && results.size() > 0) { // use SRDI to send the query // remove any non rdv peers from the candidate list // and garbage collect the index in the process Vector clean = cleanupAnyEdges(query.getSrc(), results); if (clean.size() > 0) { // The purpose of incrementing the hopcount // when an SRDI index match is found (we got a // pointer to a rdv that should have the route) is to // restrict any further forwarding. The increment // count is only done when a matching SRDI index is // found. Not when the replica is selected as we // still need to forward the query. This restriction // is purposelly done to avoid too many longjumps // within a walk. query.incrementHopCount(); srdi.forwardQuery(clean, query, 1); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("found an srdi entry forwarding query to SRDI peer"); } return; } } else { // it is not in our cache, look for the replica peer // we need to send the query PeerID destPeer = srdi.getReplicaPeer(router.addr2pid(peer).toString()); if (destPeer != null && !destPeer.equals(localPeerId)) { // don't push anywhere if we do not have a replica // or we are trying to push to ourself if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processQuery srdiIndex DHT forward :" + destPeer); } srdi.forwardQuery(destPeer.toString(), query); return; } } } } // if we reach that point then we just use the resolver walk resolver = group.getResolverService(); if (resolver != null) { resolver.sendQuery(null, query); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("find route query sent"); } } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("cannot get the resolver service"); } } } catch (Exception ee) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Exception in findRoute", ee); } } } /** * * {@inheritDoc} * This is called by the Generic ResolverServiceImpl when processing a response to a query. **/ public void processResponse(ResolverResponseMsg response) { if (!useRouteResolver) { // Route resolver disabled return; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processResponse got a response"); } // convert the response into a RouteResponse Reader ip = null; RouteResponse doc = null; try { ip = new StringReader(response.getResponse()); StructuredTextDocument asDoc = (StructuredTextDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, ip); doc = new RouteResponse(asDoc); } catch (Throwable e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processResponse: malformed response - discard", e); } return; } finally { try { if (null != ip) { ip.close(); ip = null; } } catch (Throwable ignored) {} } RouteAdvertisement dstRoute = doc.getDestRoute(); RouteAdvertisement srcRoute = doc.getSrcRoute(); int queryId = response.getQueryId(); if ((dstRoute == null) || (srcRoute == null)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processResponse: malformed response - discard."); } // Malformed response. Just discard. return; } EndpointAddress routingPeer = router.pid2addr(srcRoute.getDestPeerID()); EndpointAddress destPeer = router.pid2addr(dstRoute.getDestPeerID()); if ((routingPeer == null) || (destPeer == null)) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processResponse: malformed PeerID in response - discard."); } // Malformed response. Just discard. return; } // check if we have a negative route response if (queryId == NACKROUTE_QUERYID) { AccessPointAdvertisement badHop = dstRoute.nextHop(router.addr2pid(routingPeer)); PeerID badPeer = null; if (badHop != null) { badPeer = badHop.getPeerID(); } else { // the bad hop is the final destination badPeer = dstRoute.getDestPeerID(); } processBadRoute(badPeer, dstRoute); return; } // This is not our own peer adv, so we must not keep it // for more than its expiration time. // we only need to publish this route if // we don't know about it yet // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding // incoming messengers. if ((!router.isLocalRoute(router.pid2addr(srcRoute.getDestPeerID()))) && (!router.isRoutedRoute(router.pid2addr(srcRoute.getDestPeerID())))) { router.updateRouteAdv(srcRoute); } if (destPeer.equals(routingPeer)) { // The dest peer itself managed to respond to us. That means we // learned the route from the reverseRoute in the message // itself. So, there's nothing we need to do. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("learn route directly from the destination"); } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("learn route:" + routingPeer); } try { // build the candidate route using the // route response from the respondant peer RouteAdvertisement candidateRoute = RouteAdvertisement.newRoute(router.addr2pid(destPeer), router.addr2pid(routingPeer), (Vector) dstRoute.getVectorHops().clone()); // cleanup the candidate route from any loop and remove the local peer extra // cycle RouteAdvertisement.cleanupLoop(candidateRoute, (PeerID) localPeerId); // Is there anything left in that route (or did the respondant // believe that we are the last hop on the route - which // obviously we are not. if (candidateRoute.size() == 0) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Route response outdated: NACK responder"); } generateNACKRoute(router.addr2pid(routingPeer), router.addr2pid(destPeer), dstRoute.getVectorHops()); return; } // get the address of the first hop in the route to verify that // we have a route (direct or long) to the first hop, so the route // is valid EndpointAddress candidateRouter = router.pid2addr(candidateRoute.getFirstHop().getPeerID()); // check that we have a direct connection to the first hop if (router.ensureLocalRoute(candidateRouter, null) == null) { // If we do not have a direct route to the candidate router check // for a long route in that case stich the route RouteAdvertisement routeToRouter = router.getRoute(candidateRouter, false); if (routeToRouter == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Route response useless: no route to next router hop"); } return; } // stich the route removing any loops and localPeer cycle if (RouteAdvertisement.stichRoute(candidateRoute, routeToRouter, (PeerID) localPeerId)) { router.setRoute(candidateRoute, false); } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Route response error stiching route response"); } return; } } else { // we have a direct connection with the first hop of the candidate route // set the new route, which starts with the peer that replied to us. router.setRoute(candidateRoute, false); } } catch (Exception ex) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Route response exception when building response route" + ex); LOG.debug(" bad dstRoute: " + dstRoute.display()); } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("finish process route response successfully"); } } } /** * bad route, so let's remove everything we have so * we can start from scratch. We are maintaining a * bad route up to DEFAULT_ROUTE experition after * that we consider it to be ok to retry the same route * We are removing both the route and peer advertisement * to force a new route query * * @param src peer that reply with the NACK route info * @param dest original route information */ private void processBadRoute(PeerID badHop, RouteAdvertisement dest) { EndpointAddress addr = router.pid2addr(dest.getDestPeerID()); if (addr == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("remove bad route has a bad route info - discard"); } return; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("remove bad route info for dest " + dest.display()); if (badHop != null) { LOG.debug("remove bad route bad hop " + badHop); } } try { // check first that we still have the same route, we may already // using a new route RouteAdvertisement currentRoute = router.getRoute(addr, false); if (currentRoute == null) { // we already cleanup the route info return; } // check if we still have the old bad route, we may have // already updated the route if (!currentRoute.equals(dest)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -