⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointrouter.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        PeerAdvertisement newPadv = group.getPeerAdvertisement();        int newModCount = newPadv.getModCount();        if ((lastPeerAdv != newPadv) || (lastModCount != newModCount) || (null == localRoute)) {            lastPeerAdv = newPadv;            lastModCount = newModCount;        } else {            // The current version is good.            return localRoute;        }        // Get its EndpointService advertisement        XMLElement endpParam = (XMLElement)                newPadv.getServiceParam(PeerGroup.endpointClassID);        if (endpParam == null) {            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.severe("no Endpoint SVC Params");            }            // Return whatever we had so far.            return localRoute;        }        // get the Route Advertisement element        Enumeration paramChilds = endpParam.getChildren(RouteAdvertisement.getAdvertisementType());        XMLElement param;        if (paramChilds.hasMoreElements()) {            param = (XMLElement) paramChilds.nextElement();        } else {            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.severe("no Endpoint Route Adv");            }            // Return whatever we had so far.            return localRoute;        }        // build the new route        try {            // Stick the localPeerID in-there, since that was what            // every single caller of getMyLocalRoute did so far.            RouteAdvertisement route = (RouteAdvertisement) AdvertisementFactory.newAdvertisement(param);            route.setDestPeerID(localPeerId);            localRoute = route;        } catch (Exception ex) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Failure extracting route", ex);            }        }        return localRoute;    }    /**     * listener object to synchronize on asynchronous getMessenger     */    private static class EndpointGetMessengerAsyncListener implements MessengerEventListener {        private final EndpointRouter router;        private final EndpointAddress logDest;        volatile boolean hasResponse = false;        volatile boolean isGone = false;        private Messenger messenger = null;        /**         * Constructor         *         * @param router the router         * @param dest   logical destination         */        EndpointGetMessengerAsyncListener(EndpointRouter router, EndpointAddress dest) {            this.router = router;            this.logDest = dest;        }        /**         * {@inheritDoc}         */        public boolean messengerReady(MessengerEvent event) {            Messenger toClose = null;            synchronized (this) {                hasResponse = true;                if (event != null) {                    messenger = event.getMessenger();                    if (null != messenger) {                        if(!logDest.equals(messenger.getLogicalDestinationAddress())) {                            // Ooops, wrong number !                            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                                LOG.warning("Incorrect Messenger logical destination : " + logDest + "!=" + messenger.getLogicalDestinationAddress());                            }                            toClose = messenger;                            messenger = null;                        }                    } else {                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                            LOG.warning("null messenger for dest :" + logDest);                        }                    }                } else {                    if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                        LOG.warning("null messenger event for dest :" + logDest);                    }                }            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                if (messenger == null) {                    LOG.fine("error creating messenger for dest :" + logDest);                } else {                    LOG.fine("got a new messenger for dest :" + logDest);                }            }            // We had to release the lock on THIS before we can get the lock on            // the router. (Or face a dead lock - we treat this as a lower level            // lock)            if (messenger == null) {                if (toClose != null) {                    toClose.close();                }                // we failed to get a messenger, we need to update the try and                // failed as it currently holds an infinite timeout to permit                // another thread to retry that destination. We only retry                // every MAX_ASYNC_GETMESSENGER_RETRY seconds                router.noMessenger(logDest);                synchronized (this) {                    // Only thing that can happen is that we notify for nothing                    // We took the lock when updating hasResult, so, the event                    // will not be missed.                     // FIXME It would be more logical to let the waiter do the                     // above if (!isGone) as in the case of success below.                     // However, we'll minimize changes for risk management                     // reasons.                    notify();                }                return false;            }            // It worked. Update router cache entry if we have to.            synchronized (this) {                if (!isGone) {                    notify(); // Waiter will do the rest.                    return true;                }            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("async caller gone add the messenger " + logDest);            }            return router.newMessenger(event);        }        /**         * Wait on the async call for ASYNC_MESSENGER_WAIT         * then bailout. The messenger will be added whenever         * the async getMessenger will return         *         * @param quick if true return a messenger immediately if available,         *              otherwise wait the Messenger resolution to be completed         * @return the Messenger if one available         */        public synchronized Messenger waitForMessenger(boolean quick) {            if (!quick) {                long quitAt = TimeUtils.toAbsoluteTimeMillis(ASYNC_MESSENGER_WAIT);                while (TimeUtils.toRelativeTimeMillis(quitAt) > 0) {                    try {                        // check if we got a response already                        if (hasResponse) { // ok, we got a response                            break;                        }                        wait(ASYNC_MESSENGER_WAIT);                    } catch (InterruptedException woken) {                        Thread.interrupted();                        break;                    }                }            }            // mark the fact that the caller is bailing out            isGone = true;            return messenger;        }    }    /**     * isLocalRoute is a shallow test. It tells you that there used to be a     * local route that worked the last time it was tried.     *     * @param peerAddress Address of the destination who's route is desired.     * @return {@code true} if we know a direct route to the specified address     *         otherwise {@code false}.     */    boolean isLocalRoute(EndpointAddress peerAddress) {        return destinations.isCurrentlyReachable(peerAddress);    }    /**     * Get a Messenger for the specified destination if a direct route is known.     *     * @param peerAddress The peer who's messenger is desired.     * @param hint        A route hint to use if a new Messenger must be created.     * @return Messenger for direct route or {@code null} if none could be     *         found or created.     */    Messenger ensureLocalRoute(EndpointAddress peerAddress, RouteAdvertisement hint) {        // We need to make sure that there is a possible connection to that peer        // If we have a decent (not closed, busy or available) transport         // messenger in the pool, then we're done. Else we activly try to make         // one.        // See if we already have a messenger.        Messenger messenger = destinations.getCurrentMessenger(peerAddress);        if (messenger != null) {            return messenger;        }        // Ok, try and make one. Pass the route hint info        messenger = findReachableEndpoint(peerAddress, false, hint);        if (messenger == null) {            // We must also zap it from our positive cache: if we remembered it             // working, we should think again.            destinations.noOutgoingMessenger(peerAddress);            return null; // No way.        }        destinations.addOutgoingMessenger(peerAddress, messenger);        // We realy did bring something new. Give relief to those that have been        // waiting for it.        synchronized (this) {            notifyAll();        }        // NOTE to maintainers: Do not remove any negative cache info        // or route here. It is being managed by lower-level routines.        // The presence of a messenger in the pool has many origins,        // each case is different and taken care of by lower level        // routines.        return messenger;    }    /**     * Send a message to a given logical destination if it maps to some     * messenger in our messenger pool or if such a mapping can be found and     * added.     *     * @param destination peer-based address to send the message to.     * @param message     the message to be sent.     * @throws java.io.IOException if an io error occurs     */    void sendOnLocalRoute(EndpointAddress destination, Message message) throws IOException {        IOException lastIoe = null;        Messenger sendVia;        // Try sending the message as long as we get have transport messengers        // to try with. They close when they fail, which puts them out of cache        // or pool if any, so we will not see a broken one a second time. We'll        // try the next one until we run out of options.        while ((sendVia = ensureLocalRoute(destination, null)) != null) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Sending " + message + " to " + destination + " via " + sendVia);            }            try {                // FIXME 20040413 jice Maybe we should use the non-blocking mode                // and let excess messages be dropped given the threading issue                // still existing in the input circuit (while routing messages                // through).                sendVia.sendMessageB(message, EndpointRouter.ROUTER_SERVICE_NAME, null);                // If we reached that point, we're done.                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Sent " + message + " to " + destination);                }                return;            } catch (IOException ioe) {                // Can try again, with another messenger (most likely).                lastIoe = ioe;            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Trying next messenger to " + destination);            }            // try the next messenger if there is one.        }        // Now see why we're here.        // If we're here for no other reason than failing to get a messenger        // say so. Otherwise, report the failure from the last time we tried.        if (lastIoe == null) {            lastIoe = new IOException("No reachable endpoints for " + destination);        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.log(Level.FINE, "Could not send to " + destination, lastIoe);        }        throw lastIoe;    }    /**     * Default constructor     */    public EndpointRouter() {    }    /**     * {@inheritDoc}     */    public void init(PeerGroup group, ID assignedID, Advertisement impl) throws PeerGroupException {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -