📄 endpointrouter.java
字号:
if ((nextTry == null) || (nextTry < TimeUtils.toAbsoluteTimeMillis(MAX_ASYNC_GETMESSENGER_RETRY)) || (TimeUtils.toRelativeTimeMillis(findRouteAt) <= 0)) { // If it is already hopeless (negative cache), just give up. // Otherwise, try and recover the route. If a query is not // already pending, we may trigger a route discovery before we // wait. Else, just wait. The main problem we have here is that // the same may re-enter because the resolver query sent by // findRoute ends up with the rendezvous service trying to // resolve the same destiation if the destination happens to be // the start of the walk. In that situation we will re-enter // at every findRoute attempt until the query becomes "failed". // However, we do want to do more than one findRoute because // just one attempt can fail for totaly fortuitous or temporary // reasons. A tradeoff is to do a very limitted number of attempts // but still more than one. Over the minute for which the query // is not failed, isTimeToRety will return true at most twice // so that'll be a total of three attempts: once every 20 seconds. boolean doFind = false; ClearPendingQuery t; synchronized (pendingQueries) { t = pendingQueries.get(peerID); if (t == null) { doFind = true; t = new ClearPendingQuery(peerID); pendingQueries.put(peerID, t); } else { if (t.isFailed()) { break; } if (t.isTimeToResolveRoute()) { doFind = true; } } } // protect against the async messenger request. We only // look for a route after the first iteration by // that time we will have bailed out from the async call if (doFind) { routeResolver.findRoute(peerAddress); // we do not need to check the CM, route table will // be updated when the route response arrive. This reduces // CM activities when we wait for the route response seekRoute = false; } } // Now, wait. Responses to our query may occur asynchronously. // threads. synchronized (this) { // We can't possibly do everything above while synchronized, // so we could miss an event of interrest. But some changes // are not readily noticeable anyway, so we must wake up // every so often to retry. try { // we only need to wait if we haven't got a messenger // yet. if (destinations.getCurrentMessenger(peerAddress) == null) { wait(ASYNC_MESSENGER_WAIT); } } catch (InterruptedException woken) { Thread.interrupted(); } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No route to " + peerAddress); } return null; } catch (Exception ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "getGatewayAddress exception", ex); } return null; } } /** * {@inheritDoc} */ @Deprecated public boolean ping(EndpointAddress addr) { EndpointAddress plainAddr = new EndpointAddress(addr, null, null); try { return (getGatewayAddress(plainAddr, true, null) != null); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.FINE, "Ping failure (exception) for : " + plainAddr, e); } return false; } } /** * Receives notifications of new messengers being generated by the * underlying network transports. * <p/> * IMPORTANT: Incoming messengers only. If/when this is used for * outgoing, some things have to change: * <p/> * For example we do not need to send the welcome msg, but for * outgoing messengers, we would need to. * <p/> * Currently, newMessenger handles the outgoing side. * * @param event the new messenger event. */ public boolean messengerReady(MessengerEvent event) { Messenger messenger = event.getMessenger(); Object source = event.getSource(); EndpointAddress logDest = messenger.getLogicalDestinationAddress(); if (source instanceof MessageSender && !((MessageSender) source).allowsRouting()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Ignoring messenger to :" + logDest); } return false; } // We learned that a transport messenger has just been announced. // Nobody else took it, so far, so we'll take it. Incoming messengers // are not pooled by the endpoint service. We do pool them for our // exclusive use. boolean taken = destinations.addIncomingMessenger(logDest, messenger); // Note to maintainers: Do not remove any route or negative cache info // here. Here is why: The messenger we just obtained was made out of an // incoming connection. It brings no proof whatsoever that the peer is // reachable at our initiative. In general, there is nothing to gain in // removing our knowlege of a long route, or a pending route query, or a // triedAndFailed record, other than to force trying a newly obtained // set of addresses. They will not stop us from using this messenger as // long as it works. The only good thing we can do here, is waking up // those that may be waiting for a connection. synchronized (this) { notifyAll(); } return taken; } /** * Call when an asynchronous new messenger could not be obtained. * * @param logDest the failed logical destination */ void noMessenger(EndpointAddress logDest) { // Switch to short timeout if there was an infinite one. // Note if there's one, it is either short or inifinite. So we // look at the value only in the hope it is less expensive // than doing a redundant put. PeerID peerID = addr2pid(logDest); synchronized (this) { Long curr = triedAndFailed.get(peerID); if (curr != null && curr > TimeUtils.toAbsoluteTimeMillis(MAX_ASYNC_GETMESSENGER_RETRY)) { triedAndFailed.put(peerID, TimeUtils.toAbsoluteTimeMillis(MAX_ASYNC_GETMESSENGER_RETRY)); } } } /** * Call when an asynchronous new messenger is ready. * (name is not great). * * @param event the new messenger event. * @return always returns true */ boolean newMessenger(MessengerEvent event) { Messenger messenger = event.getMessenger(); EndpointAddress logDest = messenger.getLogicalDestinationAddress(); // We learned that a new transport messenger has just been announced. // We pool it for our exclusive use. destinations.addOutgoingMessenger(logDest, messenger); // Here's a new connection. Wakeup those that may be waiting for that. synchronized (this) { notifyAll(); } return true; } /** * Get the routed route, if any, for a given peer id. * * @param peerAddress the peer who's route is desired. * @param seekRoute boolean to indicate if we should search for a route * if we don't have one * @return a route advertisement describing the direct route to the peer. */ RouteAdvertisement getRoute(EndpointAddress peerAddress, boolean seekRoute) { ID peerID = addr2pid(peerAddress); // check if we have a valid route RouteAdvertisement route; synchronized (this) { route = routedRoutes.get(peerID); } if (route != null || !seekRoute) { // done return route; } // No known route and we're allowed to search for one // check if there is route advertisement available Iterator<RouteAdvertisement> allRadvs = routeCM.getRouteAdv(peerID); while (allRadvs.hasNext()) { route = allRadvs.next(); Vector<AccessPointAdvertisement> hops = route.getVectorHops(); // no hops, uninterresting: this needs to be a route if (hops.isEmpty()) { continue; } // let's check if we can speak to any of the hops in the route // we try them in reverse order so we shortcut the route // in the process RouteAdvertisement newRoute = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType()); newRoute.setDest(route.getDest().clone()); Vector<AccessPointAdvertisement> newHops = new Vector<AccessPointAdvertisement>(); // build the route from the available hops for (int i = hops.size() - 1; i >= 0; i--) { ID hopID = hops.elementAt(i).getPeerID(); // If the local peer is one of the first reachable // hop in the route, that route is worthless to us. if (localPeerId.equals(hopID)) { break; } EndpointAddress addr = pid2addr(hopID); if (ensureLocalRoute(addr, null) != null) { // we found a valid hop return the corresponding // route from that point for (int j = i; j <= hops.size() - 1; j++) { newHops.add(hops.elementAt(j).clone()); } // make sure we have a real route at the end if (newHops.isEmpty()) { break; } newRoute.setHops(newHops); // try to set the route if (setRoute(newRoute, false)) { // We got one; we're done. return newRoute; } else { // For some reason the route table does not // want that route. Move on to the next adv; it // unlikely that a longer version of the same would // be found good. break; } } } } // no route found return null; } /** * Check if a route is valid. * Currently, only loops are detected. * * @param routeAdvertisement The route to check. * @return {@code true} if the route is valid otherwise {@code false}. */ private boolean checkRoute(RouteAdvertisement routeAdvertisement) { if (0 == routeAdvertisement.size()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("route is empty"); } return false; } if (routeAdvertisement.containsHop(localPeerId)) { // The route does contain this local peer. Using this route // would create a loop. Discard. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("route contains this peer - loopback"); } return false; } PeerID destPid = routeAdvertisement.getDest().getPeerID(); if (routeAdvertisement.containsHop(destPid)) { // May be it is fixable, may be not. See to it. Vector<AccessPointAdvertisement> hops = routeAdvertisement.getVectorHops(); // It better be the last hop. Else this is a broken route. hops.remove(hops.lastElement()); if (routeAdvertisement.containsHop(destPid)) { // There was one other than last: broken route. return false; } } if (routeAdvertisement.hasALoop()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("route has a loop "); } return false; } else { // Seems to be a potential good route. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -