⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 routeresolver.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
            // construct a response from the query            ResolverResponseMsg res = query.makeResponse();            res.setCredential(credentialDoc);            res.setResponse(routeResponse.toString());                        resolver.sendResponse(query.getSrc(), res);            return ResolverService.OK;                    } catch (Exception ee) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("processQuery: error while processing query ", ee);            }            return ResolverService.OK;        }    }        /**     * Return a route error in case a route was found to be invalid     * as the current hop cannot find a way to forward the message to the     * destination or any other hops in the forward part of the route.     * In that case a negative route response is forwarded     * to the orginal source of the message. Now of course we     * do not have any way to guarantee that the NACK message will be     * received by the sender, but the best we can do is to try :-)     *     * we send a query ID to NACKROUTE_QUERYID to indicate     * this is a bad Route     *     * @param src original source of the message     * @param dest original destination of the message     */    protected void generateNACKRoute(PeerID src, PeerID dest, Vector origHops) {                // As long as the group is partially initialized, do not bother        // trying to send NACKS. We can't: it just causes NPEs.        if (resolver == null) {            return;        }                if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("generate NACK Route response " + src);        }                // check first, if we are not already in process of looking for a        // route to the destination peer of the NACK. We should not try to        // send a NACK to that destination at that point as this will block        // our incoming processing thread while it is looking for a route to        // that destination. If there a no pending route requests to that        // destination then we can go ahead an attempt to send the NACK. At        // the maximum we should have only one thread block while looking for        // a specific destination. When we find a route to the destination,        // the next NACK processing will be sent.                if (router.isPendingRouteQuery(router.pid2addr(src))) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("drop NACK due to pending route discovery " + src);            }            return;        }                // Generate a route response        RouteAdvertisement route = (RouteAdvertisement)                AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());                AccessPointAdvertisement ap = (AccessPointAdvertisement)                AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());                ap.setPeerID(dest);        route.setDest(ap);        route.setHops(origHops);                // set the the route of the peer that        // detected the bad route        RouteAdvertisement routeSrc = (RouteAdvertisement)                AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());                AccessPointAdvertisement apSrc = (AccessPointAdvertisement)                AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());                apSrc.setPeerID((PeerID) localPeerId);        routeSrc.setDest(apSrc);                RouteResponse routeResponse = new RouteResponse();        routeResponse.setDestRoute(route);        routeResponse.setSrcRoute(routeSrc);                        ResolverResponse res = new ResolverResponse(routerSName, credentialDoc, NACKROUTE_QUERYID, // mean NACKRoute                routeResponse.toString());                // send the NACK response back to the originator        resolver.sendResponse(src.toString(), res);    }        /**     * process an SRDI message request     *     * @param message SRDI resolver message     */    public boolean processSrdi(ResolverSrdiMsg message) {                String value;        SrdiMessage srdiMsg;                try {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Received a SRDI messsage in group" + group.getPeerGroupName());            }                        XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8,                    new StringReader(message.getPayload()));            srdiMsg = new SrdiMessageImpl(asDoc);        } catch (Exception e) {            // we don't understand this msg, let's skip it            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("corrupted SRDI message", e);            }                        return false;        }                PeerID pid = srdiMsg.getPeerID();                // filter messages that contain messages        // about the local peer, so we don't enter        // self-reference        if (pid.equals(localPeerId)) {            return false;        }                Iterator eachEntry = srdiMsg.getEntries().iterator();        while (eachEntry.hasNext()) {            SrdiMessage.Entry entry = (SrdiMessage.Entry) eachEntry.next();            // drop any information  about ourself            if (entry.key.equals(localPeerId.toString())) {                continue;            }            value = entry.value;            if (value == null) {                value = "";            }                            // Expiration of entries is taken care of by SrdiIdex, so we always add            // FIXME hamada 03142003            // All routes are added under the secondary key 'DstPID', it would be more correct to            // Specify it in the message, but since versioning is not yet supported the following is            // acceptable, since it is localized            srdiIndex.add(srdiMsg.getPrimaryKey(), "DstPID", entry.key, pid, entry.expiration);            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Primary Key [" + srdiMsg.getPrimaryKey() + "] key [DstPID]" + " value [" + entry.key + "] exp [" + entry.expiration + "]");            }        }                return true;    }        /**     *  SrdiInterface     */    public void pushEntries(boolean all) {        // only send to the replica        pushSrdi(null, all);    }        /*     * push all srdi entries to the rednezvous SRDI cache (new connection)     *@param all if true push all entries, otherwise just deltas     */    protected void pushSrdi(String peer,            boolean all) {                SrdiMessage srdiMsg;        Vector routeIx = new Vector();                // 10182002tra:Route info don't expire unless the peer disappears        // This approach is used to limit the SRDI traffic. The key        // point here is that SRDI is used to tell a peer that another        // has a route to the destination it is looking for. The information        // that SRDI cache is not so much the specific route info but rather        // the fact that a peer has knowledge of a route to another peer        // We don't want to update the SRDI cache on every route update.        // The SRDI cache will be flushed when the peer disconnect from        // the rendezvous.                // We cannot support concurrent modification of the map while we        // do that: we must synchronize...        for (Iterator each = router.getAllRoutedRouteAddresses(); each.hasNext();) {            PeerID pid = router.addr2pid((EndpointAddress) each.next());            SrdiMessage.Entry entry = new SrdiMessage.Entry(pid.toString(), "", Long.MAX_VALUE);            routeIx.addElement(entry);        }                try {                        // check if we have anything to send            if (routeIx.size() == 0) {                return;            }                        srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // one hop                    "route", routeIx);                        if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Sending a SRDI messsage of [All=" + all + "] routes");            }            // this will replicate entry to the  SRDI replica peers            srdi.replicateEntries(srdiMsg);                    } catch (Exception e) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("SRDI Push failed", e);            }        }    }        /*     * push srdi entries to the SRDI rendezvous cache     * @param all if true push all entries, otherwise just deltas     */    protected void pushSrdi(ID peer, PeerID id) {                SrdiMessage srdiMsg;        try {            srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop                    "route", id.toString(), null, new Long(Long.MAX_VALUE).longValue()); // maximum expiration            // 10182002tra:Route info don't expire unless the peer disappears            // This approach is used to limit the SRDI traffic. The key            // point here is that SRDI is used to tell a peer that another            // has a route to the destination it is looking for. The information            // that SRDI cache is not so much the specific route info but rather            // the fact that a peer has knowledge of a route to another peer            // We don't want to update the SRDI cache on every route update.            // The SRDI cache will be flushed when the peer disconnect from            // the rendezvous.            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("sending a router SRDI message add route " + id);            }            if (peer == null) {                PeerID destPeer = srdi.getReplicaPeer(id.toString());                peer = destPeer;            }            // don't push anywhere if we do not have a replica            // or we are trying to send the query to ourself            if (!localPeerId.equals(peer)) {                srdi.pushSrdi(peer, srdiMsg);            }        } catch (Exception e) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("SRDI push failed", e);            }        }    }        /**     * remove a SRDI cache entry     *     * @param peer peer id we send the request, null for sending to all     * @param id peer id of the SRDI route that we want to remove     *         from the cache     */    protected void removeSrdi(String peer,            PeerID id) {                SrdiMessage srdiMsg;        try {            srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop                    "route", id.toString(), null, // 0 means remove                    new Long(0).longValue());            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("sending a router SRDI message delete route " + id);            }            if (peer == null) {                PeerID destPeer = srdi.getReplicaPeer(id.toString());                                // don't push anywhere if we do not have replica                // or we are trying to push to ouself                if (destPeer != null && (!destPeer.equals(localPeerId))) {                    srdi.pushSrdi(destPeer, srdiMsg);                }            }        } catch (Exception e) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Removing srdi entry failed", e);            }        }    }        /**     *  {@inheritDoc}     **/    public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) {        // when the resolver failed to send, we get a notification and        // flush the SRDI cache entries for that destination        removeSrdiIndex(peerid);    }        /** cleanup any edge peers when trying to forward an SRDI query     * so we are guaranteed to the best of our knowledge that     * the peer is a rendezvous. This is not perfect, as it may     * take time for the peerview to converge but at least     * we can remove any peers that is not a rendezvous.     **/    protected Vector cleanupAnyEdges(String src, Vector results) {        Vector clean = new Vector(results.size());        PeerID pid = null;        // put the peerview as a vector of PIDs        Vector rpvId = srdi.getGlobalPeerView();                // remove any peers not in the current peerview        // these peers may be gone or have become edges        for (int i = 0; i < results.size(); i++) {            pid = (PeerID) results.elementAt(i);            // eliminate the src of the query so we don't resend            // the query to whom send it to us            if (src.equals(pid.toString())) {                continue;            }                        // remove the local also, so we don't send to ourself            if (localPeerId.equals(pid)) {                continue;            }                        if (rpvId.contains(pid)) { // ok that's a good RDV to the best                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("valid rdv for SRDI forward " + pid);                }                clean.add(pid);            } else {                // cleanup our SRDI cache for that peer                srdiIndex.remove(pid);            }        }        return clean;    }        /**     *  return the global peerview     *     * @return Vector of RDV member of the peerview     */    protected Vector getGlobalPeerView() {        return srdi.getGlobalPeerView();    }        /**     * remove SRDI index     *     * @param pid of the index to be removed     */    protected void removeSrdiIndex(PeerID pid) {        srdiIndex.remove(pid);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -