📄 routeresolver.java
字号:
if (currentRoute == null) { // we already cleanup the route info return; } // check if we still have the old bad route, we may have // already updated the route if (!currentRoute.equals(dest)) { // check if the bad hop is not the destination // if it is then we still have a bad route if (badHop == null) { // we could get the bad hop, so consider the route ok return; } if (badHop.equals(EndpointRouter.addr2pid(addr))) { // check if the new route may still contain the bad hop // the known bad hop is the hop after the src peer that // responded with a NACK route // In this case we also consider the route bad if (!currentRoute.containsHop(badHop)) { return; // we are ok } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("current route is bad because it contains known bad hop" + badHop); } } } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("current route is bad because it contains known bad destination" + badHop); } } } // keep the bad one in a cache table so we don't retry them // right away. We use the default route timeout BadRoute badRoute = (router.getBadRoute(addr)); if (badRoute != null) { if (badRoute.getExpiration() > TimeUtils.timeNow()) {// nothing to do. the information is still valid } else { // It is ancient knowlege update it badRoute.setExpiration(TimeUtils.toAbsoluteTimeMillis(BADROUTE_EXPIRATION)); } // check if we have to add a new bad hop // to our bad route if (badHop != null) { badRoute.addBadHop(badHop); badRoute.setExpiration(TimeUtils.toAbsoluteTimeMillis(BADROUTE_EXPIRATION)); } router.setBadRoute(addr, badRoute); return; } else { // create a new NACK route entry Set<PeerID> badHops; if (badHop != null) { badHops = Collections.singleton(badHop); } else { badHops = Collections.emptySet(); } badRoute = new BadRoute(dest, TimeUtils.toAbsoluteTimeMillis(BADROUTE_EXPIRATION), badHops); router.setBadRoute(addr, badRoute); } // remove route from route CM routeCM.flushRoute(EndpointRouter.addr2pid(addr)); // let's remove the remote route info from the routing table // we do this after we removed the entries from the CM // to avoid that another thread is putting back the entry router.removeRoute(EndpointRouter.addr2pid(addr)); } catch (Exception ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "exception during bad route removal", ex); } } } /** * Process the Query, and generate response * * @param query the query to process */ public int processQuery(ResolverQueryMsg query) { if (!useRouteResolver) { // Route resolver disabled return ResolverService.OK; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("processQuery starts"); } RouteQuery routeQuery; try { Reader ip = new StringReader(query.getQuery()); XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, ip); routeQuery = new RouteQuery(asDoc); } catch (RuntimeException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Malformed Route query ", e); } return ResolverService.OK; } catch (IOException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Malformed Route query ", e); } return ResolverService.OK; } PeerID pId = routeQuery.getDestPeerID(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Looking for route to " + pId); } RouteAdvertisement srcRoute = routeQuery.getSrcRoute(); Collection<PeerID> badHops = routeQuery.getBadHops(); if (Logging.SHOW_FINER && LOG.isLoggable(Level.FINER)) { StringBuilder badHopsDump = new StringBuilder("bad Hops :\n"); for (ID aBadHop : badHops) { badHopsDump.append('\t').append(aBadHop); } LOG.finer(badHopsDump.toString()); } // if our source route is not null, then publish it if (srcRoute != null) { if (!(srcRoute.getDestPeerID()).equals(localPeerId)) { // This is not our own peer adv so we must not keep it // longer than its expiration time. try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Publishing sender route info " + srcRoute.getDestPeerID()); } // we only need to publish this route if // we don't know about it yet // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding // incoming messengers. if ((!router.isLocalRoute(EndpointRouter.pid2addr(srcRoute.getDestPeerID()))) && (!router.isRoutedRoute(srcRoute.getDestPeerID()))) { routeCM.publishRoute(srcRoute); } } catch (Exception e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Could not publish Route Adv from query - discard", e); } return ResolverService.OK; } } } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No src Route in route query - discard "); } return ResolverService.OK; } if (pId == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Malformed route query request, no PeerId - discard"); } return ResolverService.OK; } // We have more luck with that one because, since it is part of OUR // message, and not part of the resolver protocol, it is in OUR // format. EndpointAddress qReqAddr = EndpointRouter.pid2addr(pId); RouteAdvertisement route; // check if this peer has a route to the destination // requested boolean found = false; if (qReqAddr.equals(localPeerAddr)) { found = true; // return the route that is my local route route = router.getMyLocalRoute(); } else { // only rendezvous can respond to route requests // if not we are generating too much traffic // XXX: here is where we could be more conservative and use isNormallyReachable() instead, thus excluding // incoming messengers. if (router.isLocalRoute(qReqAddr)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Peer has direct route to destination "); } // we should set the route to something :-) found = true; // this peer has a direct route to the destination // return the short route advertisement we know for this peer // (For us it is zero hop, and we advertise ourself as the routing // peer in the response. The stiching is done by whoever gets that // response). May be there are more than one hop advertised in-there... // alternate routing peers...should we leave them ? // For now, we keep the full dest, but wack the hops. route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType()); AccessPointAdvertisement ap = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType()); ap.setPeerID(pId); route.setDest(ap); } else { route = router.getRoute(qReqAddr, false); if (route != null) { found = true; // check if we were given some bad hops info // and see if the found route contains // any of these bad hops. In that case, we need // to mark this route as bad for (PeerID aBadHop : badHops) { // destination is known to be bad if (EndpointRouter.addr2pid(qReqAddr).equals(aBadHop)) { processBadRoute(aBadHop, route); found = false; break; } if (route.containsHop(aBadHop)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Peer has bad route due to " + aBadHop); } processBadRoute(aBadHop, route); found = false; break; } } } } } if (!found) { // discard the request if we are not a rendezvous // else forward to the next peers if (!group.isRendezvous()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("discard query forwarding as not a rendezvous"); } return ResolverService.OK; } // did not find a route, check our srdi cache // make sure we protect against out of sync // SRDI index // srdi forwarding is only involved once the Index entry has // been found and we forwarded the resolver query. Afterward a // normal walk proceeds from the initial SRDI index pointing // rdv. This is done to protect against potential loopback // entries in the SRDI cache index due to out of sync peerview // and index. if (query.getHopCount() < 2) { // check local SRDI cache to see if we have the entry // we look for 10 entries, will pickup one randomly List<PeerID> results = srdiIndex.query("route", RouteAdvertisement.DEST_PID_TAG, pId.toString(), 10); if (results.size() > 0) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("processQuery srdiIndex lookup match :" + results.size()); } // remove any non-rdv peers to avoid sending // to a non-rdv peers and garbage collect the SRDI // index in the process List<PeerID> clean = cleanupAnyEdges(query.getSrcPeer(), results); if (clean.size() > 0) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("found an srdi entry forwarding query to SRDI peer"); } // The purpose of incrementing the hopcount // when an SRDI index match is found (we got a // pointer to a rdv that should have the route) is to // restrict any further forwarding. The increment // count is only done when a matching SRDI index is // found. Not when the replica is selected as we // still need to forward the query. This restriction // is purposelly done to avoid too many longjumps // within a walk. query.incrementHopCount(); // Note: this forwards the query to 1 peer randomly // selected from the result srdi.forwardQuery(clean, query, 1); // tell the resolver no further action is needed. return ResolverService.OK; } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("did not find a route or SRDI index"); } // force a walk return ResolverService.Repropagate; } // we found a route send the response try { if (route == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("we should have had a route at this point"); } return ResolverService.OK; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("we have a route build route response" + route.display()); } RouteAdvertisement myRoute = router.getMyLocalRoute(); // make sure we initialized our local // route info as we will need it to respond. We may // not have our route if we are still // waiting for a relay connection. if (myRoute == null) { return ResolverService.OK; } RouteResponse routeResponse = new RouteResponse(); routeResponse.setDestRoute(route); routeResponse.setSrcRoute(myRoute); if (routeResponse == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("error creating route response"); } return ResolverService.OK; } // construct a response from the query ResolverResponseMsg res = query.makeResponse(); CurrentCredential current = currentCredential; if (null != current) { res.setCredential(current.credentialDoc); } res.setResponse(routeResponse.toString());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -