⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 routeresolver.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
            resolver.sendResponse(query.getSrcPeer().toString(), res);            return ResolverService.OK;        } catch (Exception ee) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "processQuery: error while processing query ", ee);            }            return ResolverService.OK;        }    }    /**     * Return a route error in case a route was found to be invalid     * as the current hop cannot find a way to forward the message to the     * destination or any other hops in the forward part of the route.     * In that case a negative route response is forwarded     * to the original source of the message. Now of course we     * do not have any way to guarantee that the NACK message will be     * received by the sender, but the best we can do is to try :-)     * <p/>     * we send a query ID to NACKROUTE_QUERYID to indicate     * this is a bad Route     *     * @param src      original source of the message     * @param dest     original destination of the message     * @param origHops original hops     */    protected void generateNACKRoute(PeerID src, PeerID dest, Vector<AccessPointAdvertisement> origHops) {        // As long as the group is partially initialized, do not bother        // trying to send NACKS. We can't: it just causes NPEs.        if (resolver == null) {            return;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("generate NACK Route response " + src);        }        // check first, if we are not already in process of looking for a        // route to the destination peer of the NACK. We should not try to        // send a NACK to that destination at that point as this will block        // our incoming processing thread while it is looking for a route to        // that destination. If there a no pending route requests to that        // destination then we can go ahead an attempt to send the NACK. At        // the maximum we should have only one thread block while looking for        // a specific destination. When we find a route to the destination,        // the next NACK processing will be sent.        if (router.isPendingRouteQuery(src)) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("drop NACK due to pending route discovery " + src);            }            return;        }        // Generate a route response        RouteAdvertisement route = (RouteAdvertisement)                AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());        AccessPointAdvertisement ap = (AccessPointAdvertisement)                AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());        ap.setPeerID(dest);        route.setDest(ap);        route.setHops(origHops);        // set the the route of the peer that        // detected the bad route        RouteAdvertisement routeSrc = (RouteAdvertisement)                AdvertisementFactory.newAdvertisement(RouteAdvertisement.getAdvertisementType());        AccessPointAdvertisement apSrc = (AccessPointAdvertisement)                AdvertisementFactory.newAdvertisement(AccessPointAdvertisement.getAdvertisementType());        apSrc.setPeerID((PeerID) localPeerId);        routeSrc.setDest(apSrc);        RouteResponse routeResponse = new RouteResponse();        routeResponse.setDestRoute(route);        routeResponse.setSrcRoute(routeSrc);                ResolverResponse res = new ResolverResponse();        res.setHandlerName(routerSName);                CurrentCredential current = currentCredential;        if (null != current) {            res.setCredential(current.credentialDoc);         }                 res.setQueryId(NACKROUTE_QUERYID);                res.setResponse(routeResponse.toString());        // send the NACK response back to the originator        resolver.sendResponse(src.toString(), res);    }    /**     * process an SRDI message request     *     * @param message SRDI resolver message     */    public boolean processSrdi(ResolverSrdiMsg message) {        if(!group.isRendezvous()) {            return true;        }                String value;        SrdiMessage srdiMsg;        try {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Received a SRDI messsage in group" + group.getPeerGroupName());            }            XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, new StringReader(message.getPayload()));            srdiMsg = new SrdiMessageImpl(asDoc);        } catch (Exception e) {            // we don't understand this msg, let's skip it            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "corrupted SRDI message", e);            }            return false;        }        PeerID pid = srdiMsg.getPeerID();        // filter messages that contain messages        // about the local peer, so we don't enter        // self-reference        if (pid.equals(localPeerId)) {            return false;        }        for (SrdiMessage.Entry entry : srdiMsg.getEntries()) {            // drop any information  about ourself            if (entry.key.equals(localPeerId.toString())) {                continue;            }            value = entry.value;            if (value == null) {                value = "";            }            // Expiration of entries is taken care of by SrdiIdex, so we always add            // FIXME hamada 20030314            // All routes are added under the secondary key 'DstPID', it would be more correct to            // Specify it in the message, but since versioning is not yet supported the following is            // acceptable, since it is localized            srdiIndex.add(srdiMsg.getPrimaryKey(), RouteAdvertisement.DEST_PID_TAG, entry.key, pid, entry.expiration);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Primary Key [" + srdiMsg.getPrimaryKey() + "] key [RouteAdvertisement.DEST_PID_TAG]" + " value [" + entry.key + "] exp [" + entry.expiration + "]");            }        }        return true;    }    /**     * {@inheritDoc}     */    public void pushEntries(boolean all) {        // only send to the replica        pushSrdi(null, all);    }    /*     * push all srdi entries to the rednezvous SRDI cache (new connection)     *     *@param all if true push all entries, otherwise just deltas     */    protected void pushSrdi(String peer, boolean all) {        SrdiMessage srdiMsg;        Vector<SrdiMessage.Entry> routeIx = new Vector<SrdiMessage.Entry>();        // 20021018 tra:Route info don't expire unless the peer disappears        // This approach is used to limit the SRDI traffic. The key        // point here is that SRDI is used to tell a peer that another        // has a route to the destination it is looking for. The information        // that SRDI cache is not so much the specific route info but rather        // the fact that a peer has knowledge of a route to another peer        // We don't want to update the SRDI cache on every route update.        // The SRDI cache will be flushed when the peer disconnect from        // the rendezvous.        // We cannot support concurrent modification of the map while we        // do that: we must synchronize...        for (Iterator<ID> each = router.getAllRoutedRouteAddresses(); each.hasNext();) {            ID pid = each.next();            SrdiMessage.Entry entry = new SrdiMessage.Entry(pid.toString(), "", Long.MAX_VALUE);            routeIx.addElement(entry);        }        try {            // check if we have anything to send            if (routeIx.size() == 0) {                return;            }            srdiMsg = new SrdiMessageImpl(group.getPeerID(),                    // one hop                    1,                    "route", routeIx);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Sending a SRDI messsage of [All=" + all + "] routes");            }            // this will replicate entry to the  SRDI replica peers            srdi.replicateEntries(srdiMsg);        } catch (Exception e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "SRDI Push failed", e);            }        }    }    /*     * push srdi entries to the SRDI rendezvous cache     * @param all if true push all entries, otherwise just deltas     */    protected void pushSrdi(ID peer, PeerID id) {        SrdiMessage srdiMsg;        try {            srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop                    "route", id.toString(), null, Long.MAX_VALUE); // maximum expiration            // 20021018 tra:Route info don't expire unless the peer disappears            // This approach is used to limit the SRDI traffic. The key            // point here is that SRDI is used to tell a peer that another            // has a route to the destination it is looking for. The information            // that SRDI cache is not so much the specific route info but rather            // the fact that a peer has knowledge of a route to another peer            // We don't want to update the SRDI cache on every route update.            // The SRDI cache will be flushed when the peer disconnect from            // the rendezvous.            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("sending a router SRDI message add route " + id);            }            if (peer == null) {                PeerID destPeer = srdi.getReplicaPeer(id.toString());                peer = destPeer;            }            // don't push anywhere if we do not have a replica            // or we are trying to send the query to ourself            if (!localPeerId.equals(peer)) {                srdi.pushSrdi(peer, srdiMsg);            }        } catch (Exception e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "SRDI push failed", e);            }        }    }    /**     * remove a SRDI cache entry     *     * @param peer peer id we send the request, null for sending to all     * @param id   peer id of the SRDI route that we want to remove     *             from the cache     */    protected void removeSrdi(String peer, PeerID id) {        SrdiMessage srdiMsg;        try {            srdiMsg = new SrdiMessageImpl(group.getPeerID(), 1, // only one hop                    "route", id.toString(), null, // 0 means remove                    0);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("sending a router SRDI message delete route " + id);            }            if (peer == null) {                PeerID destPeer = srdi.getReplicaPeer(id.toString());                // don't push anywhere if we do not have replica                // or we are trying to push to ouself                if (destPeer != null && (!destPeer.equals(localPeerId))) {                    srdi.pushSrdi(destPeer, srdiMsg);                }            }        } catch (Exception e) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "Removing srdi entry failed", e);            }        }    }    /**     * {@inheritDoc}     */    public void messageSendFailed(PeerID peerid, OutgoingMessageEvent e) {        // when the resolver failed to send, we get a notification and        // flush the SRDI cache entries for that destination        removeSrdiIndex(peerid);    }    /**     * cleanup any edge peers when trying to forward an SRDI query so we are     * guaranteed to the best of our knowledge that the peer is a rendezvous.     * This is not perfect, as it may take time for the peerview to converge but     * at least we can remove any peers that is not a rendezvous.     *     * @param src     source     * @param results vector of PeerIDs     * @return cleaned up vector of PeerIDs     */    protected List<PeerID> cleanupAnyEdges(ID src, List<PeerID> results) {        List<PeerID> clean = new ArrayList<PeerID>(results.size());        // put the peerview as a vector of PIDs        List<PeerID> rpvId = srdi.getGlobalPeerView();        // remove any peers not in the current peerview        // these peers may be gone or have become edges        for (PeerID pid : results) {            // eliminate the src of the query so we don't resend            // the query to whom send it to us            if (src.equals(pid)) {                continue;            }            // remove the local also, so we don't send to ourself            if (localPeerId.equals(pid)) {                continue;            }            if (rpvId.contains(pid)) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("valid rdv for SRDI forward " + pid);                }                clean.add(pid);            } else {                // cleanup our SRDI cache for that peer                srdiIndex.remove(pid);            }        }        return clean;    }    /**     * remove SRDI index     *     * @param pid of the index to be removed     */    protected void removeSrdiIndex(PeerID pid) {        srdiIndex.remove(pid);    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -