⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 srdi.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
     * @param peer peer to push message to, if peer is null it is     *             the message is propagated     * @param srdi SRDI message to send     */    public void pushSrdi(ID peer, SrdiMessage srdi) {        try {            ResolverSrdiMsg resSrdi = new ResolverSrdiMsgImpl(handlername, credential, srdi.toString());            if (null == peer) {                resolver.sendSrdi(null, resSrdi);            } else {                resolver.sendSrdi(peer.toString(), resSrdi);            }        } catch (Exception e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Failed to send srdi message", e);            }        }    }    /**     * Forwards a Query to a specific peer     * hopCount is incremented to indicate this query is forwarded     *     * @param peer  peerid to forward query to     * @param query The query     */    public void forwardQuery(PeerID peer, ResolverQueryMsg query) {        query.incrementHopCount();        if (query.getHopCount() > 2) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("hopCount exceeded. Not forwarding query " + query.getHopCount());            }            // query has been forwarded too many times            return;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine(MessageFormat.format("[{0} / {1}] Forwarding Query to {2}",                    group.getPeerGroupName(), handlername, peer));        }        resolver.sendQuery(peer.toString(), query);    }    /**     * Forwards a Query to a list of peers     * hopCount is incremented to indicate this query is forwarded     *     * @param peers The peerids to forward query to     * @param query The query     */    public void forwardQuery(List<PeerID> peers, ResolverQueryMsg query) {        query.incrementHopCount();        if (query.getHopCount() > 2) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine(MessageFormat.format("hopCount exceeded not forwarding query {0}", query.getHopCount()));            }            // query has been forwarded too many times            return;        }        for (PeerID destPeer : peers) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine(MessageFormat.format("[{0} / {1}] Forwarding Query to {2}",                        group.getPeerGroupName(), handlername, destPeer));            }            resolver.sendQuery(destPeer.toString(), query);        }    }    /**     * Forwards a Query to a list of peers     * if the list of peers exceeds threshold, and random threshold is picked     * from <code>peers</code>     * hopCount is incremented to indicate this query is forwarded     *     * @param peers     The peerids to forward query to     * @param query     The query     * @param threshold number of peers to forward the query to     */    public void forwardQuery(List<PeerID> peers, ResolverQueryMsg query, int threshold) {        if (query.getHopCount() > 2) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine(MessageFormat.format("[{0} / {1}] hopCount exceeded ({2}) not forwarding query.",                        group.getPeerGroupName(), handlername, query.getHopCount()));            }            // query has been forwarded too many times            return;        }        if (peers.size() <= threshold) {            forwardQuery(peers, query);        } else {            // pick some random entries out of the list            List<PeerID> newPeers = randomResult(peers, threshold);            forwardQuery(newPeers, query);        }    }    /**     * returns a random List(threshold) from a given list     *     * @param result    starting set     * @param threshold sub-set desired     * @return sub-list of result     */    protected List<PeerID> randomResult(List<PeerID> result, int threshold) {        if (threshold < result.size()) {            List<PeerID> res = new ArrayList<PeerID>(threshold);            for (int i = 0; i < threshold; i++) {                int rand = random.nextInt(result.size());                res.add(result.get(rand));                result.remove(rand);            }            return res;        }        return result;    }    /**     * Given an expression return a peer from the list peers in the peerview     * this function is used to to give a replication point, and entry point     * to query on a pipe     *     * @param expression expression to derive the mapping from     * @return The replicaPeer value     */    public PeerID getReplicaPeer(String expression) {        PeerID pid;        List<PeerID> rpv = getGlobalPeerView();        if (rpv.size() >= RPV_REPLICATION_THRESHOLD) {            BigInteger digest;            synchronized (jxtaHash) {                jxtaHash.update(expression);                digest = jxtaHash.getDigestInteger().abs();            }            BigInteger sizeOfSpace = java.math.BigInteger.valueOf(rpv.size());            BigInteger sizeOfHashSpace = BigInteger.ONE.shiftLeft(8 * digest.toByteArray().length);            int pos = (digest.multiply(sizeOfSpace)).divide(sizeOfHashSpace).intValue();            pid = rpv.get(pos);            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine(MessageFormat.format("[{0} / {1}] Found a direct peer {2}", group.getPeerGroupName(), handlername, pid));            }            return pid;        } else {            return null;        }    }    /**     * forward srdi message to another peer     *     * @param peerid       PeerID to forward query to     * @param srcPid       The source originator     * @param primaryKey   primary key     * @param secondarykey secondary key     * @param value        value of the entry     * @param expiration   expiration in ms     */    public void forwardSrdiMessage(PeerID peerid, PeerID srcPid, String primaryKey, String secondarykey, String value, long expiration) {        try {            SrdiMessageImpl srdi = new SrdiMessageImpl(srcPid, // ttl of 0, avoids additional replication                    0, primaryKey, secondarykey, value, expiration);            ResolverSrdiMsgImpl resSrdi = new ResolverSrdiMsgImpl(handlername, credential, srdi.toString());            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine(MessageFormat.format("[{0} / {1}] Forwarding a SRDI messsage of type {2} to {3}", group.getPeerGroupName(),                                handlername, primaryKey, peerid));            }            resolver.sendSrdi(peerid.toString(), resSrdi);        } catch (Exception e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Failed forwarding SRDI Message", e);            }        }    }    /**     * {@inheritDoc}     */    @SuppressWarnings("fallthrough")    public void rendezvousEvent(RendezvousEvent event) {        int theEventType = event.getType();        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine(MessageFormat.format("[{0} / {1}] Processing {2}", group.getPeerGroupName(), handlername, event));        }        switch (theEventType) {            case RendezvousEvent.RDVCONNECT:                synchronized (rdvEventLock) {                    // wake up the publish thread now.                    rdvEventLock.notify();                }               /*                *  FALLSTHRU                */            case RendezvousEvent.RDVRECONNECT:                // No need to wake up the publish thread; reconnect should not force indices to be published.                break;            case RendezvousEvent.CLIENTCONNECT:            case RendezvousEvent.CLIENTRECONNECT:            case RendezvousEvent.BECAMERDV:            case RendezvousEvent.BECAMEEDGE:                // XXX 20031110 bondolo perhaps becoming edge one should cause it to wake up so that run() switch to                // don't do anything.                break;            case RendezvousEvent.RDVFAILED:            case RendezvousEvent.RDVDISCONNECT:                republishSignal.set(true);                break;            case RendezvousEvent.CLIENTFAILED:            case RendezvousEvent.CLIENTDISCONNECT:                // we should flush the cache for the peer                synchronized (rdvEventLock) {                    if (group.isRendezvous() && (srdiIndex != null)) {                        srdiIndex.remove((PeerID) event.getPeerID());                    }                }                break;            default:                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning(                            MessageFormat.format("[{0} / {1}] Unexpected RDV event {2}", group.getPeerGroupName(), handlername, event));                }                break;        }    }    /**     * {@inheritDoc}     * <p/>     * Main processing method for the SRDI Worker thread     * Send all entries, wait for pushInterval, then send deltas     */    public void run() {        boolean waitingForRdv;        boolean republish = true;        try {            while (!stop) {                // upon connection we will have to republish                republish |= republishSignal.compareAndSet(true, false);                waitingForRdv = group.isRendezvous() || !group.getRendezVousService().isConnectedToRendezVous() ||                        group.getRendezVousService().getRendezVousStatus() == RendezVousStatus.ADHOC;                if (!waitingForRdv) {                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("[" + group.getPeerGroupName() + " / " + handlername + "] Pushing "                                + (republish ? "all entries" : "deltas"));                    }                    srdiService.pushEntries(republish);                    republish = false;                }                synchronized (rdvEventLock) {                    try {                        rdvEventLock.wait(waitingForRdv ? connectPollInterval : pushInterval);                    } catch (InterruptedException e) {                        Thread.interrupted();                    }                }            }        } catch (Throwable all) {            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.log(Level.SEVERE,                        "Uncaught Throwable in " + Thread.currentThread().getName() + "[" + group.getPeerGroupName() + " / "                                + handlername + "]",all);            }        }    }    /**     * get the global peerview as the rendezvous service only returns     * the peerview without the local RDV peer.  We need this     * consistent view for the SRDI index if not each RDV will have a     * different peerview, off setting the index even when the peerview     * is stable     *     * @return the sorted list     */    public Vector<PeerID> getGlobalPeerView() {        Vector<PeerID> global = new Vector<PeerID>();        SortedSet<String> set = new TreeSet<String>();        try {            // get the local peerview            List<RdvAdvertisement> rpv = group.getRendezVousService().getLocalWalkView();            for (RdvAdvertisement padv : rpv) {                set.add(padv.getPeerID().toString());            }            // add myself            set.add(group.getPeerID().toString());            // produce a vector of Peer IDs            for (String aSet : set) {                try {                    PeerID peerID = (PeerID) IDFactory.fromURI(new URI(aSet));                    global.add(peerID);                } catch (URISyntaxException badID) {                    throw new IllegalArgumentException("Bad PeerID ID in advertisement");                } catch (ClassCastException badID) {                    throw new IllegalArgumentException("ID was not a peerID");                }            }        } catch (Exception ex) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Failure generating the global view", ex);            }        }        return global;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -