📄 endpointrouter.java
字号:
if (implAdvertisement != null) { configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: " + implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : " + implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : " + implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : " + implAdvertisement.getCode()); } configInfo.append("\n\tGroup Params :"); configInfo.append("\n\t\tGroup : " + group.getPeerGroupName()); configInfo.append("\n\t\tGroup ID : " + group.getPeerGroupID()); configInfo.append("\n\t\tPeer ID : " + group.getPeerID()); configInfo.append("\n\tConfiguration :"); configInfo.append("\n\t\tProtocol : " + getProtocolName() ); configInfo.append("\n\t\tPublic Address : " + localPeerAddr); configInfo.append("\n\t\tUse Cm : " + routeCM.useRouteCM()); configInfo.append("\n\t\tUse RouteResolver : " + routeResolver.useRouteResolver()); LOG.info(configInfo); } } /** * {@inheritDoc} */ public int startApp(String[] arg) { int status = 0; // FIXME tra 20031015 Should be started as a service // when refactored work completed status = routeCM.startApp(arg); if (status != 0) { return status; } // FIXME tra 20031015 is there a risk for double // registration when startApp() is recalled // due to failure to get the discovery service // in the previous statement. // NOTE: Endpoint needs to be registered before // we register the endpoint resolver. This is // bringing a more complex issue of service // loading dependencies. endpoint.addMessengerEventListener(this, EndpointService.MediumPrecedence); // FIXME tra 20031015 Should be started as a service // when refactored completed status = routeResolver.startApp(arg); if (status != 0) { return status; } // publish my local route adv routeCM.publishRoute(getMyLocalRoute()); if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Router Message Transport started"); } return status; } /** * {@inheritDoc} * * <p/>Careful that stopApp() could in theory be called before startApp(). */ public void stopApp() { if (endpoint != null) { endpoint.removeIncomingMessageListener(routerSName, null); endpoint.removeMessengerEventListener(this, EndpointService.MediumPrecedence); endpoint.removeMessageTransport(this); endpoint = null; } // FIXME tra 20030818 should be unloaded as a service routeCM.stopApp(); // FIXME tra 20030818 should be unloaded as a service routeResolver.stopApp(); destinations.close(); timer.cancel(); if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Router Message Transport stopped"); } } /** * {@inheritDoc} */ public boolean isConnectionOriented() { return false; } /** * {@inheritDoc} */ public boolean allowsRouting() { // Yes, this is the router, and it does not allow routing. // Otherwise we would have a chicken and egg problem. return false; } /** * {@inheritDoc} */ public EndpointService getEndpointService() { return endpoint; } /** * {@inheritDoc} */ public EndpointAddress getPublicAddress() { return (EndpointAddress) localPeerAddr.clone(); } /** * {@inheritDoc} */ public Iterator getPublicAddresses() { return Collections.singletonList(getPublicAddress()).iterator(); } /** * {@inheritDoc} */ public String getProtocolName() { return routerPName; } /** *{@inheritDoc} */ public boolean isPropagateEnabled() { return false; } /** *{@inheritDoc} */ public boolean isPropagationSupported() { return false; } /** * {@inheritDoc} */ public void propagate(Message srcMsg, String pName, String pParam, String prunePeer) throws IOException { // All messages are lost in the ether } /** * Given a peer id, return an address to reach that peer. * The address may be for a directly reachable peer, or * for the first gateway along a route to reach the peer. * If we do not have a route to the peer, we will use the * Peer Routing Protocol to try to discover one. We will * wait up to 30 seconds for a route to be discovered. * * @param dest the peer we are trying to reach. * @param seekRoute whether to go as far as issuing a route query, or just fish in our cache. * when forwarding a message we allow ourselves to mend a broken source-issued route but we * won't go as far as seeking one from other peers. When originating a message, on the other end * we will aggressively try to find route. * @param hint whether we are passed a route hint to be used, in that case that route * hint should be used * * @return an EndpointAddress at which that peer should be reachable. */ EndpointAddress getGatewayAddress(EndpointAddress dest, boolean seekRoute) { return getGatewayAddress(dest, seekRoute, null); } EndpointAddress getGatewayAddress(EndpointAddress dest, boolean seekRoute, Object hint) { try { EndpointAddress pId = new EndpointAddress(dest, null, null); // FIXME: jice@jxta.org - 20021215 replace that junk with a background // task; separate the timings of route disco from the timeouts of // the requesting threads. EndpointAddress result = null; if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Searching local" + (seekRoute ? " & remote" : "") + " for route for " + pId); } // If we can't get a route within the timeout, give up for now. long quitAt = TimeUtils.toAbsoluteTimeMillis(MAXFINDROUTE_TIMEOUT); // Time we need to wait before we can start issue a find route request // to give a chance for the async messenger to respond (success or failure) long findRouteAt = TimeUtils.toAbsoluteTimeMillis(ASYNC_MESSENGER_WAIT); EndpointAddress addr = null; while (TimeUtils.toRelativeTimeMillis(quitAt) > 0) { // Then check if by any chance we can talk to it directly. if (ensureLocalRoute(pId, hint) != null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Found direct address " + pId); } return pId; } // Otherwise, look for a long route. // check if we got a hint. If that's the case use it RouteAdvertisement route = null; if (hint != null) { route = (RouteAdvertisement) hint; } else { route = getRoute(pId, seekRoute); } if (route != null && route.size() > 0) { addr = pid2addr(route.getLastHop().getPeerID()); if (ensureLocalRoute(addr, null) != null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Found last hop remote address: " + pId + " -> " + route.getLastHop().getPeerID()); } // Ensure local route removes negative cache info about // addr. We also need to remove that about pId. return addr; } else { // need to try the first hop addr = pid2addr(route.getFirstHop().getPeerID()); if (ensureLocalRoute(addr, null) != null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Found first hop remote address first hop: " + pId + " -> " + route.getFirstHop().getPeerID()); } // Ensure local route removes negative cache info about // addr. return addr; } else { removeRoute(pId); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Found no reachable route to " + pId); } } } } // For messages we didn't originate we don't seek routes. if (!seekRoute) { break; } // Check that route resolution is enabled if // not then bail out, there is nothing more // that we can do. if (!routeResolver.useRouteResolver()) { break; } // due to the asynchronous nature of getting our messenger we // need to handle the multi-entrance of issueing a route // discovery. A route discovery needs to be generated only // either if we have no pending request (it completed or we had // no information so we did not created one), or we tried and // we failed, or we waited at least ASYNC_MESSENGER_WAIT to get // a chance for the async request to respond before we can // issue the route discovery Long nextTry = (Long) triedAndFailed.get(pId); if ((nextTry == null) || (nextTry.longValue() < TimeUtils.toAbsoluteTimeMillis(MAXASYNC_GETMESSENGER_RETRY)) || (TimeUtils.toRelativeTimeMillis(findRouteAt) <= 0)) { // If it is already hopeless (negative cache), just give up. // Otherwise, try and recover the route. If a query is not // already pending, we may trigger a route discovery before we // wait. Else, just wait. The main problem we have here is that // the same may re-enter because the resolver query sent by // findRoute ends up with the rendezvous service trying to // resolve the same destiation if the destination happens to be // the start of the walk. In that situation we will re-enter // at every findRoute attempt until the query becomes "failed". // However, we do want to do more than one findRoute because // just one attempt can fail for totaly fortuitous or temporary // reasons. A tradeoff is to do a very limitted number of attempts // but still more than one. Over the minute for which the query // is not failed, isTimeToRety will return true at most twice // so that'll be a total of three attempts: once every 20 seconds. boolean doFind = false; ClearPendingQuery t = null; synchronized (this) { t = (ClearPendingQuery) pendingQueries.get(pId); if (t == null) { doFind = true; t = new ClearPendingQuery(pId); pendingQueries.put(pId, t); } else { if (t.isFailed()) { break; } if (t.isTimeToRetry()) { doFind = true; } } } // protect against the async messenger request. We only // look for a route after the first iteration by // that time we will have bailed out from the async call if (doFind) { routeResolver.findRoute(pId); // we do not need to check the CM, route table will // be updated when the route response arrive. This reduces // CM activities when we wait for the route response seekRoute = false; } } // Now, wait. Responses to our query may occur asynchronously. // threads. synchronized (this) { // We can't possibly do everything above while synchronized, // so we could miss an event of interrest. But some changes // are not readily noticeable anyway, so we must wake up // every so often to retry. try { // we only need to wait if we haven't got a messenger // yet. if (destinations.getCurrentMessenger(pId) == null) { wait(ASYNC_MESSENGER_WAIT); } } catch (InterruptedException woken) { Thread.interrupted(); } } } if (LOG.isEnabledFor(Level.DEBUG)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -