📄 destinations.java
字号:
// If it is gone or broken, try and get a new one. if ((messenger == null) || ((messenger.getState() & Messenger.USABLE) == 0)) { messenger = endpoint.getMessengerImmediate(xportDest, null); // If this fails, it is hopeless: the address is bad or something like that. Make ourselves expired right away. if (messenger == null) { outgoingMessenger = null; xportDest = null; expiresAt = 0; return null; } // Renew the ref. The xportDest is the same. outgoingMessenger = new SoftReference<Messenger>(messenger); } // So we had one or could renew. But, does it work ? if ((messenger.getState() & (Messenger.USABLE & Messenger.RESOLVED)) == 0) { // We no-longer have the underlying connection. Let ourselves expire. Do not renew the expiration time. outgoingMessenger = null; xportDest = null; return null; } // Ok, we do have an outgoing messenger at the ready after all. expiresAt = TimeUtils.toAbsoluteTimeMillis(EXPIRATION); return messenger; } /** * Returns a channel for this destination if one is there or can be obtained * readily and works. * <p/> * <p/>We prefer the incoming connection to the outgoing for two * reasons: * <ul> * <li>The remote peer was able to reach us. We cannot be sure that * we can reach the remote peer.</li> * <li>The remote peer initiated the connection. It has a better * sense of when the connection should be closed or reopened than * we do.</li> * * @return a channel for this destination */ Messenger getCurrentMessenger() { Messenger res = getIncoming(); if (res != null) { return res; } return getOutgoing(); } /** * @return true if we do have an outgoing messenger or, failing that, we had one not too long ago. */ boolean isNormallyReachable() { return ((getOutgoing() != null) || (TimeUtils.toRelativeTimeMillis(expiresAt) >= 0)); } /** * We think the destination is reachable somehow. Not sure how long. * * @return true if we have any kind of messenger or, failing that, we had an outgoing one not too long ago. */ boolean isCurrentlyReachable() { return ((getIncoming() != null) || (getOutgoing() != null) || (TimeUtils.toRelativeTimeMillis(expiresAt) >= 0)); } /** * @return true if this wisdom carries no positive information whatsoever. */ boolean isExpired() { return !isCurrentlyReachable(); } } /* * Internal mechanisms */ /** * Return any Wisdom for the specified destination. The address will * be normalized to the base form. * * @param destination The address of the wisdom that is sought. * @return The Wisdom for this address or {@code null} if no Wisdom found. */ private Wisdom getWisdom(EndpointAddress destination) { if (destination.getServiceName() != null) { destination = new EndpointAddress(destination, null, null); } return wisdoms.get(destination); } /** * Add a Wisdom for the specified destination. The address will * be normalized to the base form. * * @param destination The address of the Wisdom that is being added. * @param wisdom The Wisdom for this address to be added to the map. */ private void addWisdom(EndpointAddress destination, Wisdom wisdom) { destination = new EndpointAddress(destination, null, null); wisdoms.put(destination, wisdom); } /* * General house keeping. */ public Destinations(EndpointService endpoint) { this.endpoint = endpoint; wisdomGC = new WisdomGCTask(); cleanup.schedule(wisdomGC, TimeUtils.AMINUTE, TimeUtils.AMINUTE); } /** * Shutdown this cache. (stop the gc) */ public synchronized void close() { stopped = true; // forget everything. wisdoms.clear(); wisdomGC.cancel(); } /** * Handles cleanup of expired wisdoms */ class WisdomGCTask extends TimerTask { /** * {@inheritDoc} * * <p/>garbage collector. We use soft references to messengers, but we use * a strong hashmap to keep the wisdom around in a more predictable * manner. Entries are simply removed when they no-longer carry * relevant information; so there's no change in the total meaning of * the map when an entry is removed. */ @Override public void run() { try { synchronized (Destinations.this) { Iterator<Wisdom> eachWisdom = wisdoms.values().iterator(); while (eachWisdom.hasNext()) { Wisdom w = eachWisdom.next(); if (w.isExpired()) { eachWisdom.remove(); } } } } catch (Throwable all) { if (Logging.SHOW_SEVERE && Destinations.LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in TimerTask :" + Thread.currentThread().getName(), all); } } } } public synchronized Collection<EndpointAddress> allDestinations() { Set<EndpointAddress> allKeys = wisdoms.keySet(); List<EndpointAddress> res = new ArrayList<EndpointAddress>(allKeys); return res; } /* * information output */ /** * If there is a messenger at hand (incoming or otherwise), return it. * * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only). * @return A messenger to that destination if a resolved and usable one is available or can be made instantly. null otherwise. */ public synchronized Messenger getCurrentMessenger(EndpointAddress destination) { Wisdom wisdom = getWisdom(destination); if (wisdom == null) { return null; } return wisdom.getCurrentMessenger(); } /** * Is it likely that one can be made from this end. (the last attempt succeeded, not only incoming, and that was not long ago) ? * This is a conservative test. It means that declaring that we can route to that destination is a very safe bet, as opposed * to isNormallyReachable and getCurrentMessenger, which could be misleading if the only messenger we can ever get is incoming. * Not currently used. Should likely be. * * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only). * @return true if it is likely that we can get a messenger to that destination in the future. */ public synchronized boolean isNormallyReachable(EndpointAddress destination) { Wisdom wisdom = getWisdom(destination); return ((wisdom != null) && wisdom.isNormallyReachable()); } /** * Do we already have a messenger or is it likely that we can make one? * We is will return {@code true} more often than * {@code isNormallyReachable()} since it can be true even when all we have * is an incoming messenger. * * <p/>Just testing that there is an entry is no-longer the same because we * may keep the entries beyond the point where we would keep them before, so * that we can add some longer-lived information in the future, and do not * interfere as much with the gc thread. * * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only). * @return true is we are confident that we can obtain a messenger, either because we can get one instantly, or because * this destination is normally reachable. (So, it is ok to try and route to that destination, now). */ public synchronized boolean isCurrentlyReachable(EndpointAddress destination) { Wisdom wisdom = getWisdom(destination); return ((wisdom != null) && wisdom.isCurrentlyReachable()); } /** * Are we supposed to send a welcome to that destination (we can't remember having done it). * It is assumed that once true was returned, it will be acted upon. So, true is not returned a second time. * * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only). * @return true if this a destination to whish we can't remember sending a welcome message. */ public synchronized boolean isWelcomeNeeded(EndpointAddress destination) { Wisdom wisdom = getWisdom(destination); return ((wisdom != null) && wisdom.isWelcomeNeeded()); } /* * information input. */ /** * Here is a messenger that we were able to obtain. * * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only). * @param messenger The incoming messenger for that destination. * @return true if this messenger was added (keep it open). false otherwise (do what you want with it). */ public synchronized boolean addOutgoingMessenger(EndpointAddress destination, Messenger messenger) { Wisdom wisdom = getWisdom(destination); if (wisdom != null) { return wisdom.addOutgoingMessenger(messenger); } addWisdom(destination, new Wisdom(messenger, false)); return true; } /** * Here is an incoming messenger that just popped out. * * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only). * @param messenger The incoming messenger for that destination. * @return true if this messenger was added (keep it open). false otherwise (do what you want with it). */ public synchronized boolean addIncomingMessenger(EndpointAddress destination, Messenger messenger) { Wisdom wisdom = getWisdom(destination); if (wisdom != null) { return wisdom.addIncomingMessenger(messenger); } addWisdom(destination, new Wisdom(messenger, true)); return true; } /** * We tried to get a messenger but could not. We know that we do not have connectivity from our end, for now. we may still * have an incoming. However, if we had to try and make a messenger, there probably isn't an incoming, but that's not our * business here. isNormallyReachable becomes false; but we can still try when solicited. * * @param destination The destination as an endpoint address (is automatically normalized to protocol and address only). */ public synchronized void noOutgoingMessenger(EndpointAddress destination) { Wisdom wisdom = getWisdom(destination); if (wisdom != null) { wisdom.noOutgoingMessenger(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -