📄 srdi.java
字号:
/** * Forwards a Query to a specific peer * hopCount is incremented to indicate this query is forwarded * * @param peer peerid to forward query to * @param query The query */ public void forwardQuery(Object peer, ResolverQueryMsg query) { query.incrementHopCount(); if (query.getHopCount() > 2) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("hopCount exceeded. Not forwarding query " + query.getHopCount()); } // query has been forwarded too many times return; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Forwarding Query to " + peer); } resolver.sendQuery(peer.toString(), query); } /** * Forwards a Query to a list of peers * hopCount is incremented to indicate this query is forwarded * * @param peers The peerids to forward query to * @param query The query */ public void forwardQuery(Vector peers, ResolverQueryMsg query) { query.incrementHopCount(); if (query.getHopCount() > 2) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("hopCount exceeded not forwarding query " + query.getHopCount()); } // query has been forwarded too many times return; } for (int i = 0; i < peers.size(); i++) { PeerID peer = (PeerID) peers.elementAt(i); String destPeer = peer.toString(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Forwarding Query to " + destPeer); } resolver.sendQuery(destPeer, query); } } /** * Forwards a Query to a list of peers * if the list of peers exceeds threshold, and random threshold is picked * from <code>peers</code> * hopCount is incremented to indicate this query is forwarded * * @param peers The peerids to forward query to * @param query The query * @param threshold number of peers to forward the query to */ public void forwardQuery(Vector peers, ResolverQueryMsg query, int threshold) { if (query.getHopCount() > 2) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "[" + group.getPeerGroupName() + " / " + handlername + "] hopCount exceeded (" + query.getHopCount() + ") not forwarding query."); } // query has been forwarded too many times return; } if (peers.size() <= threshold) { forwardQuery(peers, query); } else { // pick some random entries out of the list Vector newPeers = randomResult(peers, threshold); forwardQuery(newPeers, query); } } /** *returns a random vector(threshold) from a given vector * * @param result starting set * @param threshold sub-set desired * @return sub-list of result */ protected Vector randomResult(Vector result, int threshold) { if (threshold < result.size()) { Vector res = new Vector(threshold); for (int i = 0; i < threshold; i++) { int rand = random.nextInt(result.size()); res.addElement(result.elementAt(rand)); result.removeElementAt(rand); } return res; } return result; } /** * Given an expression return a peer from the list peers in the peerview * this function is used to to give a replication point, and entry point * to query on a pipe * * @param expression expression to derive the mapping from * @return The replicaPeer value */ public PeerID getReplicaPeer(String expression) { PeerID pid = null; Vector rpv = getGlobalPeerView(); if (rpv.size() >= RPV_REPLICATION_THRESHOLD) { BigInteger digest = null; synchronized(jxtaHash) { jxtaHash.update(expression); digest = jxtaHash.getDigestInteger().abs(); } BigInteger sizeOfSpace = java.math.BigInteger.valueOf(rpv.size()); BigInteger sizeOfHashSpace = BigInteger.ONE.shiftLeft(8 * digest.toByteArray().length); int pos = (digest.multiply(sizeOfSpace)).divide(sizeOfHashSpace).intValue(); pid = (PeerID) rpv.elementAt(pos); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Found a direct peer " + pid); } return pid; } else { return null; } } /** * forward srdi message to another peer * * @param peerid PeerID to forward query to * @param srcPid The source originator * @param primaryKey primary key * @param secondarykey secondary key * @param value value of the entry * @param expiration expiration in ms */ public void forwardSrdiMessage(PeerID peerid, PeerID srcPid, String primaryKey, String secondarykey, String value, long expiration) { try { SrdiMessageImpl srdi = new SrdiMessageImpl(srcPid, // ttl of 0, avoids additional replication 0, primaryKey, secondarykey, value, expiration); ResolverSrdiMsgImpl resSrdi = new ResolverSrdiMsgImpl(handlername, credential, srdi.toString()); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "[" + group.getPeerGroupName() + " / " + handlername + "] Forwarding a SRDI messsage of type " + primaryKey + " to " + peerid); } resolver.sendSrdi(peerid.toString(), (ResolverSrdiMsg) resSrdi); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed forwarding SRDI Message", e); } } } /** * {@inheritDoc} * */ public void rendezvousEvent(RendezvousEvent event) { int theEventType = event.getType(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Processing " + event); } switch (theEventType) { case RendezvousEvent.RDVCONNECT: // This is an initial connection, we need to upload the // complete index. republish = true; /* * FALLSTHRU */ case RendezvousEvent.RDVRECONNECT: // This is just a renewal of the rdv lease. Nothing special to do. synchronized(rdvEventLock) { // wake up the publish thread now. rdvEventLock.notify(); } break; case RendezvousEvent.CLIENTCONNECT: case RendezvousEvent.CLIENTRECONNECT: case RendezvousEvent.BECAMERDV: case RendezvousEvent.BECAMEEDGE: // XXX 20031110 bondolo@jxta.org perhaps becoming edge one should cause it to wake up so that run() switch to // don't do anything. break; case RendezvousEvent.RDVFAILED: case RendezvousEvent.RDVDISCONNECT: republish = true; break; case RendezvousEvent.CLIENTFAILED: case RendezvousEvent.CLIENTDISCONNECT: // we should flush the cache for the peer synchronized(rdvEventLock) { if (group.isRendezvous() && (srdiIndex != null)) { srdiIndex.remove((PeerID) event.getPeerID()); } } break; default: if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("[" + group.getPeerGroupName() + " / " + handlername + "] Unexpected RDV event " + event); } break; } } /** * {@inheritDoc} * * <p/>Main processing method for the SRDI Worker thread * Send all entries, wait for pushInterval, then send deltas */ public void run() { boolean waitingForRdv; try { while (!stop) { waitingForRdv = group.isRendezvous() || !group.getRendezVousService().isConnectedToRendezVous(); // upon connection we will have to republish republish |= waitingForRdv; synchronized(rdvEventLock) { // wait until we stop being a rendezvous or connect to a rendezvous if (waitingForRdv) { try { rdvEventLock.wait(connectPollInterval); } catch (InterruptedException e) { Thread.interrupted(); } continue; } if (!republish) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug( "[" + group.getPeerGroupName() + " / " + handlername + "] Sleeping for " + pushInterval + "ms before sending deltas."); } try { rdvEventLock.wait(pushInterval); } catch (InterruptedException e) { Thread.interrupted(); continue; } if (stop) { break; } } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("[" + group.getPeerGroupName() + " / " + handlername + "] Pushing " + (republish ? "all entries" : "deltas")); } srdiService.pushEntries(republish); republish = false; } } catch (Throwable all) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in " + Thread.currentThread().getName() + "[" + group.getPeerGroupName() + " / " + handlername + "]", all); } } } /** * get the global peerview as the rendezvous service only returns * the peerview without the local RDV peer. We need this * consistent view for the SRDI index if not each RDV will have a * different peerview, off setting the index even when the peerview * is stable * * @return the sorted list */ public Vector getGlobalPeerView() { Vector global = new Vector(); SortedSet set = new TreeSet(); try { // get the local peerview Vector rpv = group.getRendezVousService().getLocalWalkView(); Iterator eachPVE = rpv.iterator(); while (eachPVE.hasNext()) { RdvAdvertisement padv = (RdvAdvertisement) eachPVE.next(); set.add(padv.getPeerID().toString()); } // add myself set.add(group.getPeerID().toString()); // produce a vector of Peer IDs Iterator eachPeerID = set.iterator(); while (eachPeerID.hasNext()) { try { PeerID id = (PeerID) IDFactory.fromURI(new URI((String) eachPeerID.next())); global.add(id); } catch (URISyntaxException badID) { throw new IllegalArgumentException("Bad PeerID ID in advertisement"); } catch (ClassCastException badID) { throw new IllegalArgumentException("ID was not a peerID"); } } } catch (Exception ex) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failure generating the global view", ex); } } return global; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -