📄 routeresolver.java
字号:
resolver.sendResponse(query.getSrcPeer().toString(), res); return ResolverService.OK; } catch (Exception ee) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "processQuery: error while processing query ", ee); } return ResolverService.OK; } } /** * Return a route error in case a route was found to be invalid * as the current hop cannot find a way to forward the message to the * destination or any other hops in the forward part of the route. * In that case a negative route response is forwarded * to the original source of the message. Now of course we * do not have any way to guarantee that the NACK message will be * received by the sender, but the best we can do is to try :-) * <p/> * we send a query ID to NACKROUTE_QUERYID to indicate * this is a bad Route * * @param src original source of the message * @param dest original destination of the message * @param origHops original hops */ protected void generateNACKRoute(PeerID src, PeerID dest, Vector<AccessPointAdvertisement> origHops) { // As long as the group is partially initialized, do not bother // trying to send NACKS. We can't: it just causes NPEs. if (resolver == null) { return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("generate NACK Route response " + src); } // check first, if we are not already in process of looking for a // route to the destination peer of the NACK. We should not try to // send a NACK to that destination at that point as this will block // our incoming processing thread while it is looking for a route to // that destination. If there a no pending route requests to that // destination then we can go ahead an attempt to send the NACK. At // the maximum we should have only one thread block while looking for // a specific destination. When we find a route to the destination, // the next NACK processing will be sent. if (router.isPendingRouteQuery(src)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("drop NACK due to pending route discovery " + src); } return; } // Generate a route response RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType()); AccessPointAdvertisement ap = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType()); ap.setPeerID(dest); route.setDest(ap); route.setHops(origHops); // set the the route of the peer that // detected the bad route RouteAdvertisement routeSrc = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType()); AccessPointAdvertisement apSrc = (AccessPointAdvertisement) AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType()); apSrc.setPeerID((PeerID) localPeerId); routeSrc.setDest(apSrc); RouteResponse routeResponse = new RouteResponse(); routeResponse.setDestRoute(route); routeResponse.setSrcRoute(routeSrc); ResolverResponse res = new ResolverResponse(); res.setHandlerName(routerSName); CurrentCredential current = currentCredential; if (null != current) { res.setCredential(current.credentialDoc); } res.setQueryId(NACKROUTE_QUERYID); res.setResponse(routeResponse.toString()); // send the NACK response back to the originator resolver.sendResponse(src.toString(), res); } /** * process an SRDI message request * * @param message SRDI resolver message */ public boolean processSrdi(ResolverSrdiMsg message) { if(!group.isRendezvous()) { return true; } String value; SrdiMessage srdiMsg; try { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Received a SRDI messsage in group" + group.getPeerGroupName()); } XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload())); srdiMsg = new SrdiMessageImpl(asDoc); } catch (Exception e) { // we don't understand this msg, let's skip it if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "corrupted SRDI message", e); } return false; } PeerID pid = srdiMsg.getPeerID(); // filter messages that contain messages // about the local peer, so we don't enter // self-reference if (pid.equals(localPeerId)) { return false; } for (SrdiMessage.Entry entry : srdiMsg.getEntries()) { // drop any information about ourself if (entry.key.equals(localPeerId.toString())) { continue; } value = entry.value; if (value == null) { value = ""; } // Expiration of entries is taken care of by SrdiIdex, so we always add // FIXME hamada 20030314 // All routes are added under the secondary key 'DstPID', it would be more correct to // Specify it in the message, but since versioning is not yet supported the following is // acceptable, since it is localized srdiIndex.add(srdiMsg.getPrimaryKey(), RouteAdvertisement.DEST_PID_TAG, entry.key, pid, entry.expiration); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Primary Key [" + srdiMsg.getPrimaryKey() + "] key [RouteAdvertisement.DEST_PID_TAG]" + " value [" + entry.key + "] exp [" + entry.expiration + "]"); } } return true; } /** * {@inheritDoc} */ public void pushEntries(boolean all) { // only send to the replica pushSrdi(null, all); } /* * push all srdi entries to the rednezvous SRDI cache (new connection) * *@param all if true push all entries, otherwise just deltas */ protected void pushSrdi(String peer, boolean all) { SrdiMessage srdiMsg; Vector<SrdiMessage.Entry> routeIx = new Vector<SrdiMessage.Entry>(); // 20021018 tra:Route info don't expire unless the peer disappears // This approach is used to limit the SRDI traffic. The key // point here is that SRDI is used to tell a peer that another // has a route to the destination it is looking for. The information // that SRDI cache is not so much the specific route info but rather // the fact that a peer has knowledge of a route to another peer // We don't want to update the SRDI cache on every route update. // The SRDI cache will be flushed when the peer disconnect from // the rendezvous. // We cannot support concurrent modification of the map while we // do that: we must synchronize... for (Iterator<ID> each = router.getAllRoutedRouteAddresses(); each.hasNext();) { ID pid = each.next(); SrdiMessage.Entry entry = new SrdiMessage.Entry(pid.toString(), "", Long.MAX_VALUE); routeIx.addElement(entry); } try { // check if we have anything to send if (routeIx.size() == 0) { return; } srdiMsg = new SrdiMessageImpl(group.getPeerID(), // one hop 1, "route", routeIx); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending a SRDI messsage of [All=" + all + "] routes"); } // this will replicate entry to the SRDI replica peers srdi.replicateEntries(srdiMsg); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "SRDI Push failed", e); } } } /* * push srdi entries to the SRDI rendezvous cache * @param all if true push all entries, otherwise just deltas */ protected void pushSrdi(ID peer, PeerID id) { SrdiMessage srdiMsg; try { srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop "route", id.toString(), null, Long.MAX_VALUE); // maximum expiration // 20021018 tra:Route info don't expire unless the peer disappears // This approach is used to limit the SRDI traffic. The key // point here is that SRDI is used to tell a peer that another // has a route to the destination it is looking for. The information // that SRDI cache is not so much the specific route info but rather // the fact that a peer has knowledge of a route to another peer // We don't want to update the SRDI cache on every route update. // The SRDI cache will be flushed when the peer disconnect from // the rendezvous. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("sending a router SRDI message add route " + id); } if (peer == null) { PeerID destPeer = srdi.getReplicaPeer(id.toString()); peer = destPeer; } // don't push anywhere if we do not have a replica // or we are trying to send the query to ourself if (!localPeerId.equals(peer)) { srdi.pushSrdi(peer, srdiMsg); } } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "SRDI push failed", e); } } } /** * remove a SRDI cache entry * * @param peer peer id we send the request, null for sending to all * @param id peer id of the SRDI route that we want to remove * from the cache */ protected void removeSrdi(String peer, PeerID id) { SrdiMessage srdiMsg; try { srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop "route", id.toString(), null, // 0 means remove 0); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("sending a router SRDI message delete route " + id); } if (peer == null) { PeerID destPeer = srdi.getReplicaPeer(id.toString()); // don't push anywhere if we do not have replica // or we are trying to push to ouself if (destPeer != null && (!destPeer.equals(localPeerId))) { srdi.pushSrdi(destPeer, srdiMsg); } } } catch (Exception e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Removing srdi entry failed", e); } } } /** * {@inheritDoc} */ public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) { // when the resolver failed to send, we get a notification and // flush the SRDI cache entries for that destination removeSrdiIndex(peerid); } /** * cleanup any edge peers when trying to forward an SRDI query so we are * guaranteed to the best of our knowledge that the peer is a rendezvous. * This is not perfect, as it may take time for the peerview to converge but * at least we can remove any peers that is not a rendezvous. * * @param src source * @param results vector of PeerIDs * @return cleaned up vector of PeerIDs */ protected List<PeerID> cleanupAnyEdges(ID src, List<PeerID> results) { List<PeerID> clean = new ArrayList<PeerID>(results.size()); // put the peerview as a vector of PIDs List<PeerID> rpvId = srdi.getGlobalPeerView(); // remove any peers not in the current peerview // these peers may be gone or have become edges for (PeerID pid : results) { // eliminate the src of the query so we don't resend // the query to whom send it to us if (src.equals(pid)) { continue; } // remove the local also, so we don't send to ourself if (localPeerId.equals(pid)) { continue; } if (rpvId.contains(pid)) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("valid rdv for SRDI forward " + pid); } clean.add(pid); } else { // cleanup our SRDI cache for that peer srdiIndex.remove(pid); } } return clean; } /** * remove SRDI index * * @param pid of the index to be removed */ protected void removeSrdiIndex(PeerID pid) { srdiIndex.remove(pid); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -