📄 endpointrouter.java
字号:
LOG.debug("No route to " + pId); } return null; } catch (Exception ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("getGatewayAddress exception", ex); } return null; } } /** * Returns true if the target address is reachable. Otherwise * returns false. */ public boolean ping(EndpointAddress addr) { EndpointAddress plainAddr = new EndpointAddress(addr, null, null); try { return (getGatewayAddress(plainAddr, true) != null); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.debug("Ping failure (exception) for : " + plainAddr, e); } return false; } } /** * Receives notifications of new messengers being generated by the * underlying network transports. * * <p/>IMPORTANT: Incoming messengers only. If/when this is used for * outgoing, some things have to change: * * <p/>For example we do not need to send the welcome msg, but * for outgoing messengers, we would need to. * * <p/>Currently, newMessenger handles the outgoing side. * * @param event the new messenger event. */ public boolean messengerReady(MessengerEvent event) { Messenger messenger = event.getMessenger(); Object source = event.getSource(); EndpointAddress logDest = messenger.getLogicalDestinationAddress(); if (source instanceof MessageSender && !((MessageSender) source).allowsRouting()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Ignoring messenger to :" + logDest); } return false; } // We learned that a transport messenger has just been // announced. Noone else took it, so far, so we'll take // it. Incoming messengers are not pooled by the endpoint // service. We do pool them for our exclusive use. boolean taken = destinations.addIncomingMessenger(logDest, messenger); // Note to maintainers: Do not remove any route or negative // cache info here. Here is why: The messenger we just // obtained was made out of an incoming connection. It brings // no proof whatsoever that the peer is reachable at our // initiative. In general, there is nothing to gain in // removing our knowlege of a long route, or a pending route // query, or a triedAndFailed record, other than to force // trying a newly obtained set of addresses. They will not // stop us from using this messenger as long as it works. // The only good thing we can do here, is waking up those // that may be waiting for a connection. synchronized (this) { notifyAll(); } return taken; } /** * call when an asynchronous new messenger could not be obtained. * * @param logDest the failed logical destination */ public void noMessenger(EndpointAddress logDest) { // Switch to short timeout if there was an infinite one. // Note if there's one, it is either short or inifinite. So we // look at the value only in the hope it is less expensive // than doing a redundant put. synchronized (this) { Long curr = (Long) triedAndFailed.get(logDest); if (curr != null && curr.longValue() > TimeUtils.toAbsoluteTimeMillis(MAXASYNC_GETMESSENGER_RETRY)) { triedAndFailed.put(logDest, new Long(TimeUtils.toAbsoluteTimeMillis(MAXASYNC_GETMESSENGER_RETRY))); } } } /** * call when an asynchronous new messenger is ready. * (name is not great). * * @param event the new messenger event. */ public boolean newMessenger(MessengerEvent event) { Messenger messenger = event.getMessenger(); EndpointAddress logDest = messenger.getLogicalDestinationAddress(); // We learned that a new transport messenger has just been announced. // We pool it for our exclusive use. destinations.addOutgoingMessenger(logDest, messenger); // Here's a new connection. Wakeup those that may be waiting // for that. synchronized (this) { notifyAll(); } return true; } /** * Get the routed route, if any, for a given peer id. * * @param pId the peer who's route is desired. * @param seekRoute boolean to indicate if we should search for a route * if we don't have one * @return a route advertisement describing the direct route to the peer. */ protected RouteAdvertisement getRoute(EndpointAddress pId, boolean seekRoute) { // check if we have a valid route RouteAdvertisement route = null; synchronized (this) { route = (RouteAdvertisement) routedRoutes.get(pId); } if (route != null || !seekRoute) { // done return route; } // No known route and we're allowed to search for one // check if there is route advertisement available Iterator allRadvs = routeCM.getRouteAdv(pId); if( null == allRadvs ) { return null; } while (allRadvs.hasNext()) { route = (RouteAdvertisement) allRadvs.next(); // let's check if we can speak to any of the hops in the route // we try them in reverse order so we shortcut the route // in the process RouteAdvertisement newRoute = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType()); newRoute.setDest((AccessPointAdvertisement) route.getDest().clone()); Vector hops = route.getVectorHops(); Vector newHops = new Vector(); // no hops, uninterresting: this needs to be a route if (hops.size() == 0) { continue; } // build the route from the available hops for (int i = hops.size() - 1; i >= 0; i--) { EndpointAddress addr = pid2addr(((AccessPointAdvertisement) hops.elementAt(i)).getPeerID()); // If the local peer is one of the first reachable // hop in the route, that route is worthless to us. if (addr.equals(localPeerAddr)) { break; } if (ensureLocalRoute(addr, null) != null) { // we found a valid hop return the corresponding // route from that point for (int j = i; j <= hops.size() - 1; j++) { newHops.add(((AccessPointAdvertisement) hops.elementAt(j)).clone()); } // make sure we have a real route at the end if (newHops.size() == 0) { break; } newRoute.setHops(newHops); // try to set the route if (setRoute(newRoute, false)) { // We got one; we're done. return newRoute; } else { // For some reason the route table does not // want that route. Move on to the next adv; it // unlikely that a longer version of the same would // be found good. break; } } } } // no route found return null; } // Check if a route is valid. // Currently, only loops are detected. private boolean checkRoute(RouteAdvertisement r) { if (r == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("route is null"); } return false; } if (r.size() == 0) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("route is empty"); } return false; } if (r.containsHop((PeerID) localPeerId)) { // The route does contain this local peer. Using this route // would create a loop. Discard. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("route contains this peer - loopback"); } return false; } PeerID destPid = r.getDest().getPeerID(); if (r.containsHop(destPid)) { // May be it is fixable, may be not. See to it. Vector hops = r.getVectorHops(); // It better be the last hop. Else this is a broken route. hops.remove(hops.lastElement()); if (r.containsHop(destPid)) { // There was one other than last: broken route. return false; } } if (r.hasALoop()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("route has a loop "); } return false; } else { // Seems to be a potential good route. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("route is ok"); } return true; } } // Adds a new long route provided there not a direct one already. // Replaces any longer route. return true if the route was truely new. // The whole deal must be synch. We do not want to add a long route // while a direct one is being added in parallell or other stupid things like that. /** * set new route info * * @param r new route to learn * @param force true if the route was optained by receiving * a message */ protected boolean setRoute(RouteAdvertisement r, boolean force) { PeerID pid = null; EndpointAddress pidAddr = null; boolean pushNeeded = false; boolean status = false; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("setRoute:"); } if (r == null) { return false; } synchronized (this) { try { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug(r.display()); } pid = r.getDest().getPeerID(); pidAddr = pid2addr(pid); // Check if we are in the case where we are // setting a new route as we received a message // always force the new route setup when we received a // a message if (!force) { // check if we have some bad NACK route info for // this destination BadRoute badRoute = (BadRoute) badRoutes.get(pidAddr); if (badRoute != null) { Long nextTry = badRoute.getExpiration(); if (nextTry.longValue() > System.currentTimeMillis()) { // check if the route we have in the NACK cache match the // new one. Need to make sure that we clean the route // from any endpoint addresses as the badRoute cache only // contains PeerIDs RouteAdvertisement routeClean = (RouteAdvertisement) r.cloneOnlyPIDs(); if (routeClean.equals(badRoute.getRoute())) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("try to use a known bad route"); } return false; } } else { // expired info, just flush NACK route cache badRoutes.remove(pidAddr); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -