📄 routeresolver.java
字号:
// construct a response from the query ResolverResponseMsg res = query.makeResponse(); res.setCredential(credentialDoc); res.setResponse(routeResponse.toString()); resolver.sendResponse(query.getSrc(), res); return ResolverService.OK; } catch (Exception ee) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("processQuery: error while processing query ", ee); } return ResolverService.OK; } } /** * Return a route error in case a route was found to be invalid * as the current hop cannot find a way to forward the message to the * destination or any other hops in the forward part of the route. * In that case a negative route response is forwarded * to the orginal source of the message. Now of course we * do not have any way to guarantee that the NACK message will be * received by the sender, but the best we can do is to try :-) * * we send a query ID to NACKROUTE_QUERYID to indicate * this is a bad Route * * @param src original source of the message * @param dest original destination of the message */ protected void generateNACKRoute(PeerID src, PeerID dest, Vector origHops) { // As long as the group is partially initialized, do not bother // trying to send NACKS. We can't: it just causes NPEs. if (resolver == null) { return; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("generate NACK Route response " + src); } // check first, if we are not already in process of looking for a // route to the destination peer of the NACK. We should not try to // send a NACK to that destination at that point as this will block // our incoming processing thread while it is looking for a route to // that destination. If there a no pending route requests to that // destination then we can go ahead an attempt to send the NACK. At // the maximum we should have only one thread block while looking for // a specific destination. When we find a route to the destination, // the next NACK processing will be sent. if (router.isPendingRouteQuery(router.pid2addr(src))) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("drop NACK due to pending route discovery " + src); } return; } // Generate a route response RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType()); AccessPointAdvertisement ap = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType()); ap.setPeerID(dest); route.setDest(ap); route.setHops(origHops); // set the the route of the peer that // detected the bad route RouteAdvertisement routeSrc = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType()); AccessPointAdvertisement apSrc = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType()); apSrc.setPeerID((PeerID) localPeerId); routeSrc.setDest(apSrc); RouteResponse routeResponse = new RouteResponse(); routeResponse.setDestRoute(route); routeResponse.setSrcRoute(routeSrc); ResolverResponse res = new ResolverResponse(routerSName, credentialDoc, NACKROUTE_QUERYID, // mean NACKRoute routeResponse.toString()); // send the NACK response back to the originator resolver.sendResponse(src.toString(), res); } /** * process an SRDI message request * * @param message SRDI resolver message */ public boolean processSrdi(ResolverSrdiMsg message) { String value; SrdiMessage srdiMsg; try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received a SRDI messsage in group" + group.getPeerGroupName()); } XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload())); srdiMsg = new SrdiMessageImpl(asDoc); } catch (Exception e) { // we don't understand this msg, let's skip it if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("corrupted SRDI message", e); } return false; } PeerID pid = srdiMsg.getPeerID(); // filter messages that contain messages // about the local peer, so we don't enter // self-reference if (pid.equals(localPeerId)) { return false; } Iterator eachEntry = srdiMsg.getEntries().iterator(); while (eachEntry.hasNext()) { SrdiMessage.Entry entry = (SrdiMessage.Entry) eachEntry.next(); // drop any information about ourself if (entry.key.equals(localPeerId.toString())) { continue; } value = entry.value; if (value == null) { value = ""; } // Expiration of entries is taken care of by SrdiIdex, so we always add // FIXME hamada 03142003 // All routes are added under the secondary key 'DstPID', it would be more correct to // Specify it in the message, but since versioning is not yet supported the following is // acceptable, since it is localized srdiIndex.add(srdiMsg.getPrimaryKey(), "DstPID", entry.key, pid, entry.expiration); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Primary Key [" + srdiMsg.getPrimaryKey() + "] key [DstPID]" + " value [" + entry.key + "] exp [" + entry.expiration + "]"); } } return true; } /** * SrdiInterface */ public void pushEntries(boolean all) { // only send to the replica pushSrdi(null, all); } /* * push all srdi entries to the rednezvous SRDI cache (new connection) *@param all if true push all entries, otherwise just deltas */ protected void pushSrdi(String peer, boolean all) { SrdiMessage srdiMsg; Vector routeIx = new Vector(); // 10182002tra:Route info don't expire unless the peer disappears // This approach is used to limit the SRDI traffic. The key // point here is that SRDI is used to tell a peer that another // has a route to the destination it is looking for. The information // that SRDI cache is not so much the specific route info but rather // the fact that a peer has knowledge of a route to another peer // We don't want to update the SRDI cache on every route update. // The SRDI cache will be flushed when the peer disconnect from // the rendezvous. // We cannot support concurrent modification of the map while we // do that: we must synchronize... for (Iterator each = router.getAllRoutedRouteAddresses(); each.hasNext();) { PeerID pid = router.addr2pid((EndpointAddress) each.next()); SrdiMessage.Entry entry = new SrdiMessage.Entry(pid.toString(), "", Long.MAX_VALUE); routeIx.addElement(entry); } try { // check if we have anything to send if (routeIx.size() == 0) { return; } srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // one hop "route", routeIx); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending a SRDI messsage of [All=" + all + "] routes"); } // this will replicate entry to the SRDI replica peers srdi.replicateEntries(srdiMsg); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("SRDI Push failed", e); } } } /* * push srdi entries to the SRDI rendezvous cache * @param all if true push all entries, otherwise just deltas */ protected void pushSrdi(ID peer, PeerID id) { SrdiMessage srdiMsg; try { srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop "route", id.toString(), null, new Long(Long.MAX_VALUE).longValue()); // maximum expiration // 10182002tra:Route info don't expire unless the peer disappears // This approach is used to limit the SRDI traffic. The key // point here is that SRDI is used to tell a peer that another // has a route to the destination it is looking for. The information // that SRDI cache is not so much the specific route info but rather // the fact that a peer has knowledge of a route to another peer // We don't want to update the SRDI cache on every route update. // The SRDI cache will be flushed when the peer disconnect from // the rendezvous. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("sending a router SRDI message add route " + id); } if (peer == null) { PeerID destPeer = srdi.getReplicaPeer(id.toString()); peer = destPeer; } // don't push anywhere if we do not have a replica // or we are trying to send the query to ourself if (!localPeerId.equals(peer)) { srdi.pushSrdi(peer, srdiMsg); } } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("SRDI push failed", e); } } } /** * remove a SRDI cache entry * * @param peer peer id we send the request, null for sending to all * @param id peer id of the SRDI route that we want to remove * from the cache */ protected void removeSrdi(String peer, PeerID id) { SrdiMessage srdiMsg; try { srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop "route", id.toString(), null, // 0 means remove new Long(0).longValue()); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("sending a router SRDI message delete route " + id); } if (peer == null) { PeerID destPeer = srdi.getReplicaPeer(id.toString()); // don't push anywhere if we do not have replica // or we are trying to push to ouself if (destPeer != null && (!destPeer.equals(localPeerId))) { srdi.pushSrdi(destPeer, srdiMsg); } } } catch (Exception e) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Removing srdi entry failed", e); } } } /** * {@inheritDoc} **/ public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) { // when the resolver failed to send, we get a notification and // flush the SRDI cache entries for that destination removeSrdiIndex(peerid); } /** cleanup any edge peers when trying to forward an SRDI query * so we are guaranteed to the best of our knowledge that * the peer is a rendezvous. This is not perfect, as it may * take time for the peerview to converge but at least * we can remove any peers that is not a rendezvous. **/ protected Vector cleanupAnyEdges(String src, Vector results) { Vector clean = new Vector(results.size()); PeerID pid = null; // put the peerview as a vector of PIDs Vector rpvId = srdi.getGlobalPeerView(); // remove any peers not in the current peerview // these peers may be gone or have become edges for (int i = 0; i < results.size(); i++) { pid = (PeerID) results.elementAt(i); // eliminate the src of the query so we don't resend // the query to whom send it to us if (src.equals(pid.toString())) { continue; } // remove the local also, so we don't send to ourself if (localPeerId.equals(pid)) { continue; } if (rpvId.contains(pid)) { // ok that's a good RDV to the best if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("valid rdv for SRDI forward " + pid); } clean.add(pid); } else { // cleanup our SRDI cache for that peer srdiIndex.remove(pid); } } return clean; } /** * return the global peerview * * @return Vector of RDV member of the peerview */ protected Vector getGlobalPeerView() { return srdi.getGlobalPeerView(); } /** * remove SRDI index * * @param pid of the index to be removed */ protected void removeSrdiIndex(PeerID pid) { srdiIndex.remove(pid); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -