📄 endpointrouter.java
字号:
// Get its EndpointService advertisement TextElement endpParam = (TextElement) newPadv.getServiceParam(PeerGroup.endpointClassID); if (endpParam == null) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("getMyLocalRoute: no Endpoint SVC Params"); } // Return whatever we had so far. return localRoute; } // get the Route Advertisement element Enumeration paramChilds = endpParam.getChildren(RouteAdvertisement.getAdvertisementType()); Element param = null; if (paramChilds.hasMoreElements()) { param = (Element) paramChilds.nextElement(); } else { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("getMyLocalRoute: no Endpoint Route Adv"); } // Return whatever we had so far. return localRoute; } // build the new route try { // Stick the localPeerID in-there, since that was what // every single caller of getMyLocalRoute did so far. RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement((TextElement) param); route.setDestPeerID((PeerID) localPeerId); localRoute = route; } catch (Exception ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("getMyLocalRoute: error extracting route", ex); } } return localRoute; } /** * listener object to synchronize on asynchronous getMessenger */ private static class EndpointGetMessengerAsyncListener implements MessengerEventListener { volatile boolean hasResponse = false; volatile boolean isGone = false; private Messenger messenger = null; private EndpointRouter router = null; private EndpointAddress logDest = null; /** * Constructor */ EndpointGetMessengerAsyncListener(EndpointRouter router, EndpointAddress logDest) { this.router = router; this.logDest = (EndpointAddress) logDest.clone(); } /** * {@inheritDoc} */ public boolean messengerReady(MessengerEvent event) { Messenger toClose = null; synchronized (this) { hasResponse = true; if (event != null) { messenger = event.getMessenger(); if (messenger != null && !messenger.getLogicalDestinationAddress().equals(logDest)) { // Ooops, wrong number ! toClose = messenger; messenger = null; } } else { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("null messenger event for dest :" + logDest); } } } if (LOG.isEnabledFor(Level.DEBUG)) { if (messenger == null) { LOG.debug("error creating messenger for dest :" + logDest); } else { LOG.debug("got a new messenger for dest :" + logDest); } } // We had to release the lock on THIS before we can get the lock // on the router. (Or face a dead lock - we treat this as a lower // level lock) if (messenger == null) { if (toClose != null) { toClose.close(); } // we failed to get a messenger, we need to update the try and // failed as it currently holds an infinite timeout to permit // another thread to retry that destination. We only retry // every MAXASYNC_GETMESSENGER_RETRY seconds router.noMessenger(logDest); synchronized (this) { // Only thing that can happen is that we notify for nothing // We took the lock when updating hasResult, so, the event // will not be missed. FIXME. It would be more logical // to let the waiter do the above if (!isGone) as in // the case of success below. However we'll // minimize changes for risk management reasons. notify(); } return false; } // It worked. Update router cache entry if we have to. synchronized (this) { if (!isGone) { notify(); // Waiter will do the rest. return true; } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("async caller gone add the messenger " + logDest); } return router.newMessenger(event); } /** * Wait on the async call for ASYNC_MESSENGER_WAIT * then bailout. The messenger will be added whenever * the async getMessenger will return */ public synchronized Messenger waitForMessenger(boolean quick) { if (!quick) { long quitAt = TimeUtils.toAbsoluteTimeMillis(ASYNC_MESSENGER_WAIT); while (TimeUtils.toRelativeTimeMillis(quitAt) > 0) { try { // check if we got a response already if (hasResponse) { // ok, we got a response break; } wait(ASYNC_MESSENGER_WAIT); } catch (InterruptedException woken) { Thread.interrupted(); break; } } } // mark the fact that the caller is bailing out isGone = true; return messenger; } } /** * how long we are willing to wait for a response from an async * getMessenger. We do not wait long at all because it is non-critical * that we get the answer synchronously. The goal is to avoid starting * a route discovery if there's a chance to get a direct connection. * However, we will still take advantage of the direct route if it is * found while we wait for the route discovery result. If that happens, * the only wrong is that we used some bandwidth doing a route discovery * that wasn't needed after all. */ public final static long ASYNC_MESSENGER_WAIT = 3L * TimeUtils.ASECOND; /** * isLocalRoute is a shallow test. It tells you that there used to be * a local route that worked the last time it was tried. */ protected boolean isLocalRoute(EndpointAddress pId) { return destinations.isCurrentlyReachable(pId); } /** * Ensure there is a local route for a given peer id if it can at all be done. * * @param pId the peer who's route is desired. * @param hint specify a specific route hint to use * @return Messenger for local route. null if none could be found or created. */ protected Messenger ensureLocalRoute(EndpointAddress pId, Object hint) { // We need to make sure that there is a possible connection to that peer // If we have a decent (not closed, busy or available) transport messenger in // the pool, then we're done. Else we actively try to make one. // See if we already have a messenger Messenger m = destinations.getCurrentMessenger(pId); if (m != null) { return m; } // Ok, try and make one. Pass the route hint info m = findReachableEndpoint(pId, false, hint); if (m == null) { // We must also zap it from our positive cache: if we remembered it working, we should think again. destinations.noOutgoingMessenger(pId); return null; // No way. } destinations.addOutgoingMessenger(pId, m); // We realy did bring something new. Give relief to those that // have been waiting for it. synchronized (this) { notifyAll(); } // NOTE to maintainers: Do not remove any negative cache info // or route here. It is being managed by lower-level routines. // The presence of a messenger in the pool has many origins, // each case is different and taken care of by lower level // routines. return m; } /** * Send a message to a given logical destination if it maps to some * messenger in our messenger pool or if such a mapping can be found and * added. * * @param destination peer-based address to send the message to. * @param message the message to be sent. */ void sendOnLocalRoute(EndpointAddress destination, Message message) throws IOException { IOException lastIoe = null; Messenger wm; // Try as long as we get a transport messenger to try with. They close when they fail, which // puts them out of cache or pool if any, so we will not see a broken one a second time. We'll // try the next one until we run out of options. while ((wm = ensureLocalRoute(destination, null)) != null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending to " + destination + " found a messenger"); } try { // FIXME - jice@jxta.org 20040413: May be we should use the non-blocking mode and let excess messages be dropped // given the threading issue still existing in the input circuit (while routing messages through). wm.sendMessageB(message, EndpointRouter.routerSName, null); // If we reached that point, we're done. if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending to " + destination + " worked"); } return; } catch (IOException ioe) { // Can try again, with another messenger (most likely). lastIoe = ioe; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Trying next messenger to " + destination); } // try the next messenger if there is one. } // Now see why we're here. // If we're here for no other reason than failing to get a messenger // say so. Otherwise, report the failure from the last time // we tried. if (lastIoe == null) { lastIoe = new IOException("No longer any reachable endpoints to destination."); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Could not send to " + destination, lastIoe); } throw (IOException) lastIoe; } /** * Default constructor */ public EndpointRouter() { // FIXME tra 20030818 Should be loaded as a service // when we have service dependency routeCM = new RouteCM(); // FIXME tra 20030818 Should be loaded as a service // when we have service dependency routeResolver = new RouteResolver(); } /** * {@inheritDoc} */ public void init(PeerGroup g, ID assignedID, Advertisement impl) throws PeerGroupException { timer.schedule(new TimerThreadNamer("EndpointRouter Timer for " + g.getPeerGroupID()), 0); group = g; ModuleImplAdvertisement implAdvertisement = (ModuleImplAdvertisement) impl; endpoint = group.getEndpointService(); localPeerId = group.getPeerID(); localPeerAddr = new EndpointAddress(routerPName, group.getPeerID().getUniqueValue().toString(), null, null); destinations = new Destinations(endpoint); // initialize persistent CM route Cache // FIXME tra 20030818 Should be loaded as service when complete // refactoring is done. When loaded as a true service should not // have to pass the EnpointRouter object. The issue is we need // an api to obtain the real object from the PeerGroup API. routeCM.init(g, assignedID, impl, this); // initialize the route resolver // FIXME tra 20030818 Should be loaded as service when complete // refactoring is done. When loaded as a true service should not // have to pass the EnpointRouter object. The issue is we need // an api to obtain the real object from the PeerGroup API. routeResolver.init(g, assignedID, impl, this); endpoint.addIncomingMessageListener(this, routerSName, null); if (endpoint.addMessageTransport(this) == null) { throw new PeerGroupException("Transport registration refused"); } if (LOG.isEnabledFor(Level.INFO)) { StringBuffer configInfo = new StringBuffer("Configuring Router Transport : "+ assignedID);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -