📄 endpointrouter.java
字号:
PeerAdvertisement newPadv = group.getPeerAdvertisement(); int newModCount = newPadv.getModCount(); if ((lastPeerAdv != newPadv) || (lastModCount != newModCount) || (null == localRoute)) { lastPeerAdv = newPadv; lastModCount = newModCount; } else { // The current version is good. return localRoute; } // Get its EndpointService advertisement XMLElement endpParam = (XMLElement) newPadv.getServiceParam(PeerGroup.endpointClassID); if (endpParam == null) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("no Endpoint SVC Params"); } // Return whatever we had so far. return localRoute; } // get the Route Advertisement element Enumeration paramChilds = endpParam.getChildren(RouteAdvertisement.getAdvertisementType()); XMLElement param; if (paramChilds.hasMoreElements()) { param = (XMLElement) paramChilds.nextElement(); } else { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.severe("no Endpoint Route Adv"); } // Return whatever we had so far. return localRoute; } // build the new route try { // Stick the localPeerID in-there, since that was what // every single caller of getMyLocalRoute did so far. RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(param); route.setDestPeerID(localPeerId); localRoute = route; } catch (Exception ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failure extracting route", ex); } } return localRoute; } /** * listener object to synchronize on asynchronous getMessenger */ private static class EndpointGetMessengerAsyncListener implements MessengerEventListener { private final EndpointRouter router; private final EndpointAddress logDest; volatile boolean hasResponse = false; volatile boolean isGone = false; private Messenger messenger = null; /** * Constructor * * @param router the router * @param dest logical destination */ EndpointGetMessengerAsyncListener(EndpointRouter router, EndpointAddress dest) { this.router = router; this.logDest = dest; } /** * {@inheritDoc} */ public boolean messengerReady(MessengerEvent event) { Messenger toClose = null; synchronized (this) { hasResponse = true; if (event != null) { messenger = event.getMessenger(); if (null != messenger) { if(!logDest.equals(messenger.getLogicalDestinationAddress())) { // Ooops, wrong number ! if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Incorrect Messenger logical destination : " + logDest + "!=" + messenger.getLogicalDestinationAddress()); } toClose = messenger; messenger = null; } } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("null messenger for dest :" + logDest); } } } else { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("null messenger event for dest :" + logDest); } } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { if (messenger == null) { LOG.fine("error creating messenger for dest :" + logDest); } else { LOG.fine("got a new messenger for dest :" + logDest); } } // We had to release the lock on THIS before we can get the lock on // the router. (Or face a dead lock - we treat this as a lower level // lock) if (messenger == null) { if (toClose != null) { toClose.close(); } // we failed to get a messenger, we need to update the try and // failed as it currently holds an infinite timeout to permit // another thread to retry that destination. We only retry // every MAX_ASYNC_GETMESSENGER_RETRY seconds router.noMessenger(logDest); synchronized (this) { // Only thing that can happen is that we notify for nothing // We took the lock when updating hasResult, so, the event // will not be missed. // FIXME It would be more logical to let the waiter do the // above if (!isGone) as in the case of success below. // However, we'll minimize changes for risk management // reasons. notify(); } return false; } // It worked. Update router cache entry if we have to. synchronized (this) { if (!isGone) { notify(); // Waiter will do the rest. return true; } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("async caller gone add the messenger " + logDest); } return router.newMessenger(event); } /** * Wait on the async call for ASYNC_MESSENGER_WAIT * then bailout. The messenger will be added whenever * the async getMessenger will return * * @param quick if true return a messenger immediately if available, * otherwise wait the Messenger resolution to be completed * @return the Messenger if one available */ public synchronized Messenger waitForMessenger(boolean quick) { if (!quick) { long quitAt = TimeUtils.toAbsoluteTimeMillis(ASYNC_MESSENGER_WAIT); while (TimeUtils.toRelativeTimeMillis(quitAt) > 0) { try { // check if we got a response already if (hasResponse) { // ok, we got a response break; } wait(ASYNC_MESSENGER_WAIT); } catch (InterruptedException woken) { Thread.interrupted(); break; } } } // mark the fact that the caller is bailing out isGone = true; return messenger; } } /** * isLocalRoute is a shallow test. It tells you that there used to be a * local route that worked the last time it was tried. * * @param peerAddress Address of the destination who's route is desired. * @return {@code true} if we know a direct route to the specified address * otherwise {@code false}. */ boolean isLocalRoute(EndpointAddress peerAddress) { return destinations.isCurrentlyReachable(peerAddress); } /** * Get a Messenger for the specified destination if a direct route is known. * * @param peerAddress The peer who's messenger is desired. * @param hint A route hint to use if a new Messenger must be created. * @return Messenger for direct route or {@code null} if none could be * found or created. */ Messenger ensureLocalRoute(EndpointAddress peerAddress, RouteAdvertisement hint) { // We need to make sure that there is a possible connection to that peer // If we have a decent (not closed, busy or available) transport // messenger in the pool, then we're done. Else we activly try to make // one. // See if we already have a messenger. Messenger messenger = destinations.getCurrentMessenger(peerAddress); if (messenger != null) { return messenger; } // Ok, try and make one. Pass the route hint info messenger = findReachableEndpoint(peerAddress, false, hint); if (messenger == null) { // We must also zap it from our positive cache: if we remembered it // working, we should think again. destinations.noOutgoingMessenger(peerAddress); return null; // No way. } destinations.addOutgoingMessenger(peerAddress, messenger); // We realy did bring something new. Give relief to those that have been // waiting for it. synchronized (this) { notifyAll(); } // NOTE to maintainers: Do not remove any negative cache info // or route here. It is being managed by lower-level routines. // The presence of a messenger in the pool has many origins, // each case is different and taken care of by lower level // routines. return messenger; } /** * Send a message to a given logical destination if it maps to some * messenger in our messenger pool or if such a mapping can be found and * added. * * @param destination peer-based address to send the message to. * @param message the message to be sent. * @throws java.io.IOException if an io error occurs */ void sendOnLocalRoute(EndpointAddress destination, Message message) throws IOException { IOException lastIoe = null; Messenger sendVia; // Try sending the message as long as we get have transport messengers // to try with. They close when they fail, which puts them out of cache // or pool if any, so we will not see a broken one a second time. We'll // try the next one until we run out of options. while ((sendVia = ensureLocalRoute(destination, null)) != null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + message + " to " + destination + " via " + sendVia); } try { // FIXME 20040413 jice Maybe we should use the non-blocking mode // and let excess messages be dropped given the threading issue // still existing in the input circuit (while routing messages // through). sendVia.sendMessageB(message, EndpointRouter.ROUTER_SERVICE_NAME, null); // If we reached that point, we're done. if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sent " + message + " to " + destination); } return; } catch (IOException ioe) { // Can try again, with another messenger (most likely). lastIoe = ioe; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Trying next messenger to " + destination); } // try the next messenger if there is one. } // Now see why we're here. // If we're here for no other reason than failing to get a messenger // say so. Otherwise, report the failure from the last time we tried. if (lastIoe == null) { lastIoe = new IOException("No reachable endpoints for " + destination); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Could not send to " + destination, lastIoe); } throw lastIoe; } /** * Default constructor */ public EndpointRouter() { } /** * {@inheritDoc} */ public void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -