⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 srdi.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    /**     *  Forwards a Query to a specific peer     *  hopCount is incremented to indicate this query is forwarded     *     * @param  peer   peerid to forward query to     * @param  query  The query     */    public void forwardQuery(Object peer, ResolverQueryMsg query) {        query.incrementHopCount();        if (query.getHopCount() > 2) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("hopCount exceeded. Not forwarding query " + query.getHopCount());            }            // query has been forwarded too many times            return;        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Forwarding Query to " + peer);        }        resolver.sendQuery(peer.toString(), query);    }    /**     *  Forwards a Query to a list of peers     *  hopCount is incremented to indicate this query is forwarded     *     * @param  peers  The peerids to forward query to     * @param  query  The query     */    public void forwardQuery(Vector peers, ResolverQueryMsg query) {        query.incrementHopCount();        if (query.getHopCount() > 2) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("hopCount exceeded not forwarding query " + query.getHopCount());            }            // query has been forwarded too many times            return;        }        for (int i = 0; i < peers.size(); i++) {            PeerID peer = (PeerID) peers.elementAt(i);            String destPeer = peer.toString();            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Forwarding Query to " + destPeer);            }            resolver.sendQuery(destPeer, query);        }    }    /**     * Forwards a Query to a list of peers     * if the list of peers exceeds threshold, and random threshold is picked     * from <code>peers</code>     * hopCount is incremented to indicate this query is forwarded     *     * @param  peers      The peerids to forward query to     * @param  query      The query     * @param  threshold  number of peers to forward the query to     */    public void forwardQuery(Vector peers,                             ResolverQueryMsg query,                             int threshold) {        if (query.getHopCount() > 2) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug(                        "[" + group.getPeerGroupName() + " / " + handlername + "] hopCount exceeded (" + query.getHopCount()                         + ") not forwarding query.");            }            // query has been forwarded too many times            return;        }        if (peers.size() <= threshold) {            forwardQuery(peers, query);        } else {            // pick some random entries out of the list            Vector newPeers = randomResult(peers, threshold);            forwardQuery(newPeers, query);        }    }    /**     *returns a random vector(threshold) from a given vector     *     * @param  result     starting set     * @param  threshold  sub-set desired     * @return            sub-list of result     */    protected Vector randomResult(Vector result, int threshold) {        if (threshold < result.size()) {            Vector res = new Vector(threshold);            for (int i = 0; i < threshold; i++) {                int rand = random.nextInt(result.size());                res.addElement(result.elementAt(rand));                result.removeElementAt(rand);            }            return res;        }        return result;    }    /**     *  Given an expression return a peer from the list peers in the peerview     *  this function is used to to give a replication point, and entry point     *  to query on a pipe     *     * @param  expression  expression to derive the mapping from     * @return             The replicaPeer value     */    public PeerID getReplicaPeer(String expression) {        PeerID pid = null;        Vector rpv = getGlobalPeerView();        if (rpv.size() >= RPV_REPLICATION_THRESHOLD) {            BigInteger digest = null;            synchronized(jxtaHash) {                jxtaHash.update(expression);                digest = jxtaHash.getDigestInteger().abs();            }            BigInteger sizeOfSpace = java.math.BigInteger.valueOf(rpv.size());            BigInteger sizeOfHashSpace = BigInteger.ONE.shiftLeft(8 * digest.toByteArray().length);            int pos = (digest.multiply(sizeOfSpace)).divide(sizeOfHashSpace).intValue();            pid = (PeerID) rpv.elementAt(pos);            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Found a direct peer " + pid);            }            return pid;        } else {            return null;        }    }    /**     *  forward srdi message to another peer     *     * @param  peerid        PeerID to forward query to     * @param  srcPid        The source originator     * @param  primaryKey    primary key     * @param  secondarykey  secondary key     * @param  value         value of the entry     * @param  expiration    expiration in ms     */    public void forwardSrdiMessage(PeerID peerid,            PeerID srcPid,            String primaryKey,            String secondarykey,            String value,            long expiration) {        try {            SrdiMessageImpl srdi = new SrdiMessageImpl(srcPid,            // ttl of 0, avoids additional replication            0, primaryKey, secondarykey, value, expiration);            ResolverSrdiMsgImpl resSrdi = new ResolverSrdiMsgImpl(handlername, credential, srdi.toString());            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug(                        "[" + group.getPeerGroupName() + " / " + handlername + "] Forwarding a SRDI messsage of type " + primaryKey + " to " + peerid);            }            resolver.sendSrdi(peerid.toString(), (ResolverSrdiMsg) resSrdi);        } catch (Exception e) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Failed forwarding SRDI Message", e);            }        }    }    /**     * {@inheritDoc}     *     */    public void rendezvousEvent(RendezvousEvent event) {        int theEventType = event.getType();        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Processing " + event);        }        switch (theEventType) {            case RendezvousEvent.RDVCONNECT:                // This is an initial connection, we need to upload the                // complete index.                republish = true;            /*             *  FALLSTHRU             */            case RendezvousEvent.RDVRECONNECT:                // This is just a renewal of the rdv lease. Nothing special to do.                synchronized(rdvEventLock) {                    // wake up the publish thread now.                    rdvEventLock.notify();                }                break;            case RendezvousEvent.CLIENTCONNECT:            case RendezvousEvent.CLIENTRECONNECT:            case RendezvousEvent.BECAMERDV:            case RendezvousEvent.BECAMEEDGE:                // XXX 20031110 bondolo@jxta.org perhaps becoming edge one should cause it to wake up so that run() switch to                // don't do anything.                break;            case RendezvousEvent.RDVFAILED:            case RendezvousEvent.RDVDISCONNECT:                republish = true;                break;            case RendezvousEvent.CLIENTFAILED:            case RendezvousEvent.CLIENTDISCONNECT:                // we should flush the cache for the peer                synchronized(rdvEventLock) {                    if (group.isRendezvous() && (srdiIndex != null)) {                        srdiIndex.remove((PeerID) event.getPeerID());                    }                }                break;            default:                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("[" + group.getPeerGroupName() + " / " + handlername + "] Unexpected RDV event " + event);                }                break;        }    }    /**     * {@inheritDoc}     *     *  <p/>Main processing method for the SRDI Worker thread     *  Send all entries, wait for pushInterval, then send deltas     */    public void run() {        boolean waitingForRdv;        try {            while (!stop) {                waitingForRdv = group.isRendezvous() || !group.getRendezVousService().isConnectedToRendezVous();                // upon connection we will have to republish                republish |= waitingForRdv;                synchronized(rdvEventLock) {                    // wait until we stop being a rendezvous or connect to a rendezvous                    if (waitingForRdv) {                        try {                            rdvEventLock.wait(connectPollInterval);                        } catch (InterruptedException e) {                            Thread.interrupted();                        }                        continue;                    }                    if (!republish) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug(                                    "[" + group.getPeerGroupName() + " / " + handlername + "] Sleeping for " + pushInterval                                     + "ms before sending deltas.");                        }                        try {                            rdvEventLock.wait(pushInterval);                        } catch (InterruptedException e) {                            Thread.interrupted();                            continue;                        }                        if (stop) {                            break;                        }                    }                }                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Pushing " + (republish ? "all entries" : "deltas"));                }                srdiService.pushEntries(republish);                republish = false;            }        } catch (Throwable all) {            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("Uncaught Throwable in " + Thread.currentThread().getName() + "[" + group.getPeerGroupName() + " / " + handlername + "]",                        all);            }        }    }    /**     * get the global peerview as the rendezvous service only returns     * the peerview without the local RDV peer.  We need this     * consistent view for the SRDI index if not each RDV will have a     * different peerview, off setting the index even when the peerview     * is stable     *     * @return    the sorted list     */    public Vector getGlobalPeerView() {        Vector global = new Vector();        SortedSet set = new TreeSet();        try {            // get the local peerview            Vector rpv = group.getRendezVousService().getLocalWalkView();            Iterator eachPVE = rpv.iterator();            while (eachPVE.hasNext()) {                RdvAdvertisement padv = (RdvAdvertisement) eachPVE.next();                set.add(padv.getPeerID().toString());            }            // add myself            set.add(group.getPeerID().toString());            // produce a vector of Peer IDs            Iterator eachPeerID = set.iterator();            while (eachPeerID.hasNext()) {                try {                    PeerID id = (PeerID) IDFactory.fromURI(new URI((String) eachPeerID.next()));                    global.add(id);                } catch (URISyntaxException badID) {                    throw new IllegalArgumentException("Bad PeerID ID in advertisement");                } catch (ClassCastException badID) {                    throw new IllegalArgumentException("ID was not a peerID");                }            }        } catch (Exception ex) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Failure generating the global view", ex);            }        }        return global;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -