📄 endpointrouter.java
字号:
timer.schedule(new TimerThreadNamer("EndpointRouter Timer for " + group.getPeerGroupID()), 0); this.group = group; this.assignedID = assignedID; ModuleImplAdvertisement implAdvertisement = (ModuleImplAdvertisement) impl; localPeerId = group.getPeerID(); localPeerAddr = pid2addr(group.getPeerID()); if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) { StringBuilder configInfo = new StringBuilder("Configuring Router Transport : " + assignedID); if (implAdvertisement != null) { configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode()); } configInfo.append("\n\tGroup Params :"); configInfo.append("\n\t\tGroup : ").append(group); configInfo.append("\n\t\tPeer ID : ").append(group.getPeerID()); configInfo.append("\n\tConfiguration :"); configInfo.append("\n\t\tProtocol : ").append(getProtocolName()); configInfo.append("\n\t\tPublic Address : ").append(localPeerAddr); LOG.config(configInfo.toString()); } } /** * {@inheritDoc} */ public synchronized int startApp(String[] arg) { endpoint = group.getEndpointService(); if (null == endpoint) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Stalled until there is an endpoint service"); } return START_AGAIN_STALLED; } Service needed = group.getResolverService(); if (null == needed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Endpoint Router start stalled until resolver service available"); } return Module.START_AGAIN_STALLED; } needed = group.getMembershipService(); if (null == needed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Endpoint Router start stalled until membership service available"); } return Module.START_AGAIN_STALLED; } needed = group.getRendezVousService(); if (null == needed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Endpoint Router start stalled until rendezvous service available"); } return Module.START_AGAIN_STALLED; } destinations = new Destinations(endpoint); try { // FIXME tra 20030818 Should be loaded as a service // when we have service dependency. When loaded as a true service should // not have to pass the EnpointRouter object. The issue is we need an // api to obtain the real object from the PeerGroup API. routeCM = new RouteCM(); // FIXME tra 20030818 Should be loaded as a service // when we have service dependency. When loaded as a true service should // not have to pass the EnpointRouter object. The issue is we need an // api to obtain the real object from the PeerGroup API. routeResolver = new RouteResolver(this); // initialize persistent CM route Cache // FIXME tra 20030818 Should be loaded as service when complete // refactoring is done. routeCM.init(group, assignedID, null); // initialize the route resolver // FIXME tra 20030818 Should be loaded as service when complete // refactoring is done. routeResolver.init(group, assignedID, null); } catch (PeerGroupException failure) { return -1; } int status; // FIXME tra 20031015 Should be started as a service when refactored work // completed status = routeCM.startApp(arg); if (status != 0) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Route CM failed to start : " + status); } return status; } // FIXME tra 20031015 Should be started as a service when refactored work // completed status = routeResolver.startApp(arg); if (status != 0) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Route Resolver failed to start : " + status); } return status; } // publish my local route adv routeCM.publishRoute(getMyLocalRoute()); // FIXME tra 20031015 is there a risk for double registration when // startApp() is recalled due to failure to get the discovery service // by the Route Resolver service. // NOTE: Endpoint needs to be registered before we register the endpoint // resolver. This is bringing a more complex issue of service loading // dependencies. endpoint.addMessengerEventListener(this, EndpointService.MediumPrecedence); endpoint.addIncomingMessageListener(this, ROUTER_SERVICE_NAME, null); if (endpoint.addMessageTransport(this) == null) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("Transport registration refused"); } return -1; } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info(group + " : Router Message Transport started."); } return status; } /** * {@inheritDoc} * <p/> * Careful that stopApp() could in theory be called before startApp(). */ public synchronized void stopApp() { stopped = true; if (endpoint != null) { endpoint.removeIncomingMessageListener(ROUTER_SERVICE_NAME, null); endpoint.removeMessengerEventListener(this, EndpointService.MediumPrecedence); endpoint.removeMessageTransport(this); } // FIXME tra 20030818 should be unloaded as a service routeCM.stopApp(); // FIXME tra 20030818 should be unloaded as a service routeResolver.stopApp(); destinations.close(); if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info(group + " : Router Message Transport stopped."); } } /** * {@inheritDoc} */ public boolean isConnectionOriented() { return false; } /** * {@inheritDoc} */ public boolean allowsRouting() { // Yes, this is the router, and it does not allow routing. // Otherwise we would have a chicken and egg problem. return false; } /** * {@inheritDoc} */ public EndpointService getEndpointService() { return endpoint; } /** * {@inheritDoc} */ public EndpointAddress getPublicAddress() { return localPeerAddr; } /** * {@inheritDoc} */ public Iterator<EndpointAddress> getPublicAddresses() { return Collections.singletonList(getPublicAddress()).iterator(); } /** * {@inheritDoc} */ public String getProtocolName() { return ROUTER_PROTOCOL_NAME; } /** * Given a peer id, return an address to reach that peer. * The address may be for a directly reachable peer, or * for the first gateway along a route to reach the peer. * If we do not have a route to the peer, we will use the * Peer Routing Protocol to try to discover one. We will * wait up to 30 seconds for a route to be discovered. * * @param peerAddress the peer we are trying to reach. * @param seekRoute whether to go as far as issuing a route query, or just fish in our cache. * when forwarding a message we allow ourselves to mend a broken source-issued route but we * won't go as far as seeking one from other peers. When originating a message, on the other end * we will aggressively try to find route. * @param hint whether we are passed a route hint to be used, in that case that route * hint should be used * @return an EndpointAddress at which that peer should be reachable. */ EndpointAddress getGatewayAddress(EndpointAddress peerAddress, boolean seekRoute, RouteAdvertisement hint) { PeerID peerID = addr2pid(peerAddress); try { // FIXME 20021215 jice Replace this junk with a background task; // separate the timings of route disco from the timeouts of // the requesting threads. EndpointAddress result = null; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Searching local" + (seekRoute ? " & remote" : "") + " for route for " + peerAddress); } // If we can't get a route within the timeout, give up for now. long quitAt = TimeUtils.toAbsoluteTimeMillis(MAX_FINDROUTE_TIMEOUT); // Time we need to wait before we can start issue a find route request // to give a chance for the async messenger to respond (success or failure) long findRouteAt = TimeUtils.toAbsoluteTimeMillis(ASYNC_MESSENGER_WAIT); EndpointAddress addr; while (TimeUtils.toRelativeTimeMillis(quitAt) > 0) { // Then check if by any chance we can talk to it directly. Messenger directMessenger = ensureLocalRoute(peerAddress, hint); if (null != directMessenger) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Found direct route for " + peerAddress + " via " + directMessenger.getDestinationAddress()); } return peerAddress; } // Otherwise, look for a long route. // check if we got a hint. If that's the case use it RouteAdvertisement route; if (hint != null) { route = hint; } else { route = getRoute(peerAddress, seekRoute); } if (route != null && route.size() > 0) { addr = pid2addr(route.getLastHop().getPeerID()); if (ensureLocalRoute(addr, null) != null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Found last hop remote address: " + peerAddress + " -> " + route.getLastHop().getPeerID()); } // Ensure local route removes negative cache info about // addr. We also need to remove that about peerAddress. return addr; } else { // need to try the first hop addr = pid2addr(route.getFirstHop().getPeerID()); if (ensureLocalRoute(addr, null) != null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Found first hop remote address first hop: " + peerAddress + " -> " + route.getFirstHop().getPeerID()); } // Ensure local route removes negative cache info about addr. return addr; } else { removeRoute(peerID); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Found no reachable route to " + peerAddress); } } } } // For messages we didn't originate we don't seek routes. if (!seekRoute) { break; } // Check that route resolution is enabled if // not then bail out, there is nothing more // that we can do. if (!routeResolver.useRouteResolver()) { break; } // due to the asynchronous nature of getting our messenger we // need to handle the multi-entrance of issueing a route // discovery. A route discovery needs to be generated only // either if we have no pending request (it completed or we had // no information so we did not created one), or we tried and // we failed, or we waited at least ASYNC_MESSENGER_WAIT to get // a chance for the async request to respond before we can // issue the route discovery Long nextTry = triedAndFailed.get(peerID);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -