⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointrouter.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
        // Get its EndpointService advertisement        TextElement endpParam = (TextElement)                                newPadv.getServiceParam(PeerGroup.endpointClassID);        if (endpParam == null) {            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("getMyLocalRoute: no Endpoint SVC Params");            }            // Return whatever we had so far.            return localRoute;        }        // get the Route Advertisement element        Enumeration paramChilds = endpParam.getChildren(RouteAdvertisement.getAdvertisementType());        Element param = null;        if (paramChilds.hasMoreElements()) {            param = (Element) paramChilds.nextElement();        } else {            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("getMyLocalRoute: no Endpoint Route Adv");            }            // Return whatever we had so far.            return localRoute;        }        // build the new route        try {            // Stick the localPeerID in-there, since that was what            // every single caller of getMyLocalRoute did so far.            RouteAdvertisement route = (RouteAdvertisement)                                       AdvertisementFactory.newAdvertisement((TextElement) param);            route.setDestPeerID((PeerID) localPeerId);            localRoute = route;        } catch (Exception ex) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("getMyLocalRoute: error extracting route", ex);            }        }        return localRoute;    }    /**     *  listener object to synchronize on asynchronous getMessenger     */    private static class EndpointGetMessengerAsyncListener implements MessengerEventListener {        volatile boolean hasResponse = false;        volatile boolean isGone = false;        private Messenger messenger = null;        private EndpointRouter router = null;        private EndpointAddress logDest = null;        /**         * Constructor         */        EndpointGetMessengerAsyncListener(EndpointRouter router, EndpointAddress logDest) {            this.router = router;            this.logDest = (EndpointAddress) logDest.clone();        }        /**         *  {@inheritDoc}         */        public boolean messengerReady(MessengerEvent event) {            Messenger toClose = null;            synchronized (this) {                hasResponse = true;                if (event != null) {                    messenger = event.getMessenger();                    if (messenger != null && !messenger.getLogicalDestinationAddress().equals(logDest)) {                        // Ooops, wrong number !                        toClose = messenger;                        messenger = null;                    }                } else {                    if (LOG.isEnabledFor(Level.WARN)) {                        LOG.warn("null messenger event for dest :" + logDest);                    }                }            }            if (LOG.isEnabledFor(Level.DEBUG)) {                if (messenger == null) {                    LOG.debug("error creating messenger for dest :" + logDest);                } else {                    LOG.debug("got a new messenger for dest :" + logDest);                }            }            // We had to release the lock on THIS before we can get the lock            // on the router. (Or face a dead lock - we treat this as a lower            // level lock)            if (messenger == null) {                if (toClose != null) {                    toClose.close();                }                // we failed to get a messenger, we need to update the try and                // failed as it currently holds an infinite timeout to permit                // another thread to retry that destination. We only retry                // every MAXASYNC_GETMESSENGER_RETRY seconds                router.noMessenger(logDest);                synchronized (this) {                    // Only thing that can happen is that we notify for nothing                    // We took the lock when updating hasResult, so, the event                    // will not be missed. FIXME. It would be more logical                    // to let the waiter do the above if (!isGone) as in                    // the case of success below. However we'll                    // minimize changes for risk management reasons.                    notify();                }                return false;            }            // It worked. Update router cache entry if we have to.            synchronized (this) {                if (!isGone) {                    notify(); // Waiter will do the rest.                    return true;                }            }            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("async caller gone add the messenger " + logDest);            }            return router.newMessenger(event);        }        /**         * Wait on the async call for ASYNC_MESSENGER_WAIT         * then bailout. The messenger will be added whenever         * the async getMessenger will return         */        public synchronized Messenger waitForMessenger(boolean quick) {            if (!quick) {                long quitAt = TimeUtils.toAbsoluteTimeMillis(ASYNC_MESSENGER_WAIT);                while (TimeUtils.toRelativeTimeMillis(quitAt) > 0) {                    try {                        // check if we got a response already                        if (hasResponse) { // ok, we got a response                            break;                        }                        wait(ASYNC_MESSENGER_WAIT);                    } catch (InterruptedException woken) {                        Thread.interrupted();                        break;                    }                }            }            // mark the fact that the caller is bailing out            isGone = true;            return messenger;        }    }    /**     * how long we are willing to wait for a response from an async     * getMessenger. We do not wait long at all because it is non-critical     * that we get the answer synchronously. The goal is to avoid starting     * a route discovery if there's a chance to get a direct connection.     * However, we will still take advantage of the direct route if it is     * found while we wait for the route discovery result. If that happens,     * the only wrong is that we used some bandwidth doing a route discovery     * that wasn't needed after all.     */    public final static long ASYNC_MESSENGER_WAIT = 3L * TimeUtils.ASECOND;    /**     * isLocalRoute is a shallow test. It tells you that there used to be     * a local route that worked the last time it was tried.     */    protected boolean isLocalRoute(EndpointAddress pId) {        return destinations.isCurrentlyReachable(pId);    }    /**     *  Ensure there is a local route for a given peer id if it can at all be done.     *     *  @param pId  the peer who's route is desired.     *  @param hint specify a specific route hint to use     *  @return Messenger for local route. null if none could be found or created.     */    protected Messenger ensureLocalRoute(EndpointAddress pId, Object hint) {        // We need to make sure that there is a possible connection to that peer        // If we have a decent (not closed, busy or available) transport messenger in        // the pool, then we're done. Else we actively try to make one.        // See if we already have a messenger        Messenger m = destinations.getCurrentMessenger(pId);        if (m != null) {            return m;        }        // Ok, try and make one. Pass the route hint info        m = findReachableEndpoint(pId, false, hint);        if (m == null) {            // We must also zap it from our positive cache: if we remembered it working, we should think again.            destinations.noOutgoingMessenger(pId);            return null; // No way.        }        destinations.addOutgoingMessenger(pId, m);        // We realy did bring something new. Give relief to those that        // have been waiting for it.        synchronized (this) {            notifyAll();        }        // NOTE to maintainers: Do not remove any negative cache info        // or route here. It is being managed by lower-level routines.        // The presence of a messenger in the pool has many origins,        // each case is different and taken care of by lower level        // routines.        return m;    }    /**     *  Send a message to a given logical destination if it maps to some     *  messenger in our messenger pool or if such a mapping can be found and     *  added.     *     *  @param destination peer-based address to send the message to.     *  @param message the message to be sent.     */    void sendOnLocalRoute(EndpointAddress destination, Message message)    throws IOException {        IOException lastIoe = null;        Messenger wm;        // Try as long as we get a transport messenger to try with. They close when they fail, which        // puts them out of cache or pool if any, so we will not see a broken one a second time. We'll        // try the next one until we run out of options.        while ((wm = ensureLocalRoute(destination, null)) != null) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Sending to " + destination + " found a messenger");            }            try {                // FIXME - jice@jxta.org 20040413: May be we should use the non-blocking mode and let excess messages be dropped                // given the threading issue still existing in the input circuit (while routing messages through).                wm.sendMessageB(message, EndpointRouter.routerSName, null);                // If we reached that point, we're done.                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Sending to " + destination + " worked");                }                return;            } catch (IOException ioe) {                // Can try again, with another messenger (most likely).                lastIoe = ioe;            }            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Trying next messenger to " + destination);            }            // try the next messenger if there is one.        }        // Now see why we're here.        // If we're here for no other reason than failing to get a messenger        // say so. Otherwise, report the failure from the last time        // we tried.        if (lastIoe == null) {            lastIoe = new IOException("No longer any reachable endpoints to destination.");        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Could not send to " + destination, lastIoe);        }        throw (IOException) lastIoe;    }    /**     *  Default constructor     */    public EndpointRouter() {        // FIXME tra 20030818  Should be loaded as a service        // when we have service dependency        routeCM = new RouteCM();        // FIXME tra 20030818  Should be loaded as a service        // when we have service dependency        routeResolver = new RouteResolver();    }    /**     *  {@inheritDoc}     */    public void init(PeerGroup g, ID assignedID, Advertisement impl)    throws PeerGroupException {        timer.schedule(new TimerThreadNamer("EndpointRouter Timer for " + g.getPeerGroupID()), 0);        group = g;        ModuleImplAdvertisement implAdvertisement = (ModuleImplAdvertisement) impl;        endpoint = group.getEndpointService();        localPeerId = group.getPeerID();        localPeerAddr = new EndpointAddress(routerPName, group.getPeerID().getUniqueValue().toString(), null, null);        destinations = new Destinations(endpoint);        // initialize persistent CM route Cache        // FIXME tra 20030818 Should be loaded as service when complete        // refactoring is done. When loaded as a true service should not        // have to pass the EnpointRouter object. The issue is we need        // an api to obtain the real object from the PeerGroup API.        routeCM.init(g, assignedID, impl, this);        // initialize the route resolver        // FIXME tra 20030818 Should be loaded as service when complete        // refactoring is done. When loaded as a true service should not        // have to pass the EnpointRouter object. The issue is we need        // an api to obtain the real object from the PeerGroup API.        routeResolver.init(g, assignedID, impl, this);        endpoint.addIncomingMessageListener(this, routerSName, null);        if (endpoint.addMessageTransport(this) == null) {            throw new PeerGroupException("Transport registration refused");        }        if (LOG.isEnabledFor(Level.INFO)) {            StringBuffer configInfo = new StringBuffer("Configuring Router Transport : "+ assignedID);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -