⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointrouter.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
            if (implAdvertisement != null) {                configInfo.append("\n\tImplementation :");                configInfo.append("\n\t\tModule Spec ID: " + implAdvertisement.getModuleSpecID());                configInfo.append("\n\t\tImpl Description : " + implAdvertisement.getDescription());                configInfo.append("\n\t\tImpl URI : " + implAdvertisement.getUri());                configInfo.append("\n\t\tImpl Code : " + implAdvertisement.getCode());            }            configInfo.append("\n\tGroup Params :");            configInfo.append("\n\t\tGroup : " + group.getPeerGroupName());            configInfo.append("\n\t\tGroup ID : " + group.getPeerGroupID());            configInfo.append("\n\t\tPeer ID : " + group.getPeerID());            configInfo.append("\n\tConfiguration :");            configInfo.append("\n\t\tProtocol : " + getProtocolName() );            configInfo.append("\n\t\tPublic Address : " + localPeerAddr);            configInfo.append("\n\t\tUse Cm : " + routeCM.useRouteCM());            configInfo.append("\n\t\tUse RouteResolver : " + routeResolver.useRouteResolver());            LOG.info(configInfo);        }    }    /**     *  {@inheritDoc}     */    public int startApp(String[] arg) {        int status = 0;        // FIXME tra 20031015 Should be started as a service        // when refactored work completed        status = routeCM.startApp(arg);        if (status != 0) {            return status;        }        // FIXME tra 20031015 is there a risk for double        // registration  when startApp() is recalled        // due to failure to get the discovery service        // in the previous statement.        // NOTE: Endpoint needs to be registered before        // we register the endpoint resolver. This is        // bringing a more complex issue of service        // loading dependencies.        endpoint.addMessengerEventListener(this, EndpointService.MediumPrecedence);        // FIXME tra 20031015 Should be started as a service        // when refactored completed        status = routeResolver.startApp(arg);        if (status != 0) {            return status;        }        // publish my local route adv        routeCM.publishRoute(getMyLocalRoute());        if (LOG.isEnabledFor(Level.INFO)) {            LOG.info("Router Message Transport started");        }        return status;    }    /**     *  {@inheritDoc}     *     * <p/>Careful that stopApp() could in theory be called before startApp().     */    public void stopApp() {        if (endpoint != null) {            endpoint.removeIncomingMessageListener(routerSName, null);            endpoint.removeMessengerEventListener(this, EndpointService.MediumPrecedence);            endpoint.removeMessageTransport(this);            endpoint = null;        }        // FIXME tra 20030818 should be unloaded as a service        routeCM.stopApp();        // FIXME tra 20030818 should be unloaded as a service        routeResolver.stopApp();        destinations.close();        timer.cancel();        if (LOG.isEnabledFor(Level.INFO)) {            LOG.info("Router Message Transport stopped");        }    }    /**     *  {@inheritDoc}     */    public boolean isConnectionOriented() {        return false;    }    /**     *  {@inheritDoc}     */    public boolean allowsRouting() {        // Yes, this is the router, and it does not allow routing.        // Otherwise we would have a chicken and egg problem.        return false;    }    /**     *  {@inheritDoc}     */    public EndpointService getEndpointService() {        return endpoint;    }    /**     *  {@inheritDoc}     */    public EndpointAddress getPublicAddress() {        return (EndpointAddress) localPeerAddr.clone();    }    /**     *  {@inheritDoc}     */    public Iterator getPublicAddresses() {        return Collections.singletonList(getPublicAddress()).iterator();    }    /**     *  {@inheritDoc}     */    public String getProtocolName() {        return routerPName;    }    /**     *{@inheritDoc}     */    public boolean isPropagateEnabled() {        return false;    }    /**     *{@inheritDoc}     */    public boolean isPropagationSupported() {        return false;    }    /**     *  {@inheritDoc}     */    public void propagate(Message srcMsg, String pName, String pParam, String prunePeer) throws IOException {        // All messages are lost in the ether    }    /**     * Given a peer id, return an address to reach that peer.     * The address may be for a directly reachable peer, or     * for the first gateway along a route to reach the peer.     * If we do not have a route to the peer, we will use the     * Peer Routing Protocol to try to discover one.  We will     * wait up to 30 seconds for a route to be discovered.     *     * @param dest the peer we are trying to reach.     * @param seekRoute whether to go as far as issuing a route query, or just fish in our cache.     * when forwarding a message we allow ourselves to mend a broken source-issued route but we     * won't go as far as seeking one from other peers. When originating a message, on the other end     * we will aggressively try to find route.     * @param hint whether we are passed a route hint to be used, in that case that route     * hint should be used     *     * @return an EndpointAddress at which that peer should be reachable.     */    EndpointAddress getGatewayAddress(EndpointAddress dest, boolean seekRoute) {        return getGatewayAddress(dest, seekRoute, null);    }    EndpointAddress getGatewayAddress(EndpointAddress dest, boolean seekRoute, Object hint) {        try {            EndpointAddress pId = new EndpointAddress(dest, null, null);            // FIXME: jice@jxta.org - 20021215 replace that junk with a background            // task; separate the timings of route disco from the timeouts of            // the requesting threads. EndpointAddress result = null;            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Searching local" + (seekRoute ? " & remote" : "") + " for route for " + pId);            }            // If we can't get a route within the timeout, give up for now.            long quitAt = TimeUtils.toAbsoluteTimeMillis(MAXFINDROUTE_TIMEOUT);            // Time we need to wait before we can start issue a find route request            // to give a chance for the async messenger to respond (success or failure)            long findRouteAt = TimeUtils.toAbsoluteTimeMillis(ASYNC_MESSENGER_WAIT);            EndpointAddress addr = null;            while (TimeUtils.toRelativeTimeMillis(quitAt) > 0) {                // Then check if by any chance we can talk to it directly.                if (ensureLocalRoute(pId, hint) != null) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Found direct address " + pId);                    }                    return pId;                }                // Otherwise, look for a long route.                // check if we got a hint. If that's the case use it                RouteAdvertisement route = null;                if (hint != null) {                    route = (RouteAdvertisement) hint;                } else {                    route = getRoute(pId, seekRoute);                }                if (route != null && route.size() > 0) {                    addr = pid2addr(route.getLastHop().getPeerID());                    if (ensureLocalRoute(addr, null) != null) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Found last hop remote address: " + pId + " -> " + route.getLastHop().getPeerID());                        }                        // Ensure local route removes negative cache info about                        // addr. We also need to remove that about pId.                        return addr;                    } else { // need to try the first hop                        addr = pid2addr(route.getFirstHop().getPeerID());                        if (ensureLocalRoute(addr, null) != null) {                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("Found first hop remote address first hop: " + pId + " -> " + route.getFirstHop().getPeerID());                            }                            // Ensure local route removes negative cache info about                            // addr.                            return addr;                        } else {                            removeRoute(pId);                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("Found no reachable route to " + pId);                            }                        }                    }                }                // For messages we didn't originate we don't seek routes.                if (!seekRoute) {                    break;                }                // Check that route resolution is enabled if                // not then bail out, there is nothing more                // that we can do.                if (!routeResolver.useRouteResolver()) {                    break;                }                // due to the asynchronous nature of getting our messenger we                // need to handle the multi-entrance of issueing a route                // discovery. A route discovery needs to be generated only                // either if we have no pending request (it completed or we had                // no information so we did not created one), or we tried and                // we failed, or we waited at least ASYNC_MESSENGER_WAIT to get                // a chance for the async request to respond before we can                // issue the route discovery                Long nextTry = (Long) triedAndFailed.get(pId);                if ((nextTry == null) || (nextTry.longValue() < TimeUtils.toAbsoluteTimeMillis(MAXASYNC_GETMESSENGER_RETRY))                    || (TimeUtils.toRelativeTimeMillis(findRouteAt) <= 0)) {                    // If it is already hopeless (negative cache), just give up.                    // Otherwise, try and recover the route. If a query is not                    // already pending, we may trigger a route discovery before we                    // wait. Else, just wait. The main problem we have here is that                    // the same may re-enter because the resolver query sent by                    // findRoute ends up with the rendezvous service trying to                    // resolve the same destiation if the destination  happens to be                    // the start of the walk. In that situation we will re-enter                    // at every findRoute attempt until the query becomes "failed".                    // However, we do want to do more than one findRoute because                    // just one attempt can fail for totaly fortuitous or temporary                    // reasons. A tradeoff is to do a very limitted number of attempts                    // but still more than one. Over the minute for which the query                    // is not failed, isTimeToRety will return true at most twice                    // so that'll be a total of three attempts: once every 20 seconds.                    boolean doFind = false;                    ClearPendingQuery t = null;                    synchronized (this) {                        t = (ClearPendingQuery) pendingQueries.get(pId);                        if (t == null) {                            doFind = true;                            t = new ClearPendingQuery(pId);                            pendingQueries.put(pId, t);                        } else {                            if (t.isFailed()) {                                break;                            }                            if (t.isTimeToRetry()) {                                doFind = true;                            }                        }                    }                    // protect against the async messenger request. We only                    // look for a route after the first iteration by                    // that time we will have bailed out from the async call                    if (doFind) {                        routeResolver.findRoute(pId);                        // we do not need to check the CM, route table will                        // be updated when the route response arrive. This reduces                        // CM activities when we wait for the route response                        seekRoute = false;                    }                }                // Now, wait. Responses to our query may occur asynchronously.                // threads.                synchronized (this) {                    // We can't possibly do everything above while synchronized,                    // so we could miss an event of interrest. But some changes                    // are not readily noticeable anyway, so we must wake up                    // every so often to retry.                    try {                        // we only need to wait if we haven't got a messenger                        // yet.                        if (destinations.getCurrentMessenger(pId) == null) {                            wait(ASYNC_MESSENGER_WAIT);                        }                    } catch (InterruptedException woken) {                        Thread.interrupted();                    }                }            }            if (LOG.isEnabledFor(Level.DEBUG)) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -