📄 srdi.java
字号:
* @param peer peer to push message to, if peer is null it is * the message is propagated * @param srdi SRDI message to send */ public void pushSrdi(ID peer, SrdiMessage srdi) { try { ResolverSrdiMsg resSrdi = new ResolverSrdiMsgImpl(handlername, credential, srdi.toString()); if (null == peer) { resolver.sendSrdi(null, resSrdi); } else { resolver.sendSrdi(peer.toString(), resSrdi); } } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed to send srdi message", e); } } } /** * Forwards a Query to a specific peer * hopCount is incremented to indicate this query is forwarded * * @param peer peerid to forward query to * @param query The query */ public void forwardQuery(PeerID peer, ResolverQueryMsg query) { query.incrementHopCount(); if (query.getHopCount() > 2) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("hopCount exceeded. Not forwarding query " + query.getHopCount()); } // query has been forwarded too many times return; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("[{0} / {1}] Forwarding Query to {2}", group.getPeerGroupName(), handlername, peer)); } resolver.sendQuery(peer.toString(), query); } /** * Forwards a Query to a list of peers * hopCount is incremented to indicate this query is forwarded * * @param peers The peerids to forward query to * @param query The query */ public void forwardQuery(List<PeerID> peers, ResolverQueryMsg query) { query.incrementHopCount(); if (query.getHopCount() > 2) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("hopCount exceeded not forwarding query {0}", query.getHopCount())); } // query has been forwarded too many times return; } for (PeerID destPeer : peers) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("[{0} / {1}] Forwarding Query to {2}", group.getPeerGroupName(), handlername, destPeer)); } resolver.sendQuery(destPeer.toString(), query); } } /** * Forwards a Query to a list of peers * if the list of peers exceeds threshold, and random threshold is picked * from <code>peers</code> * hopCount is incremented to indicate this query is forwarded * * @param peers The peerids to forward query to * @param query The query * @param threshold number of peers to forward the query to */ public void forwardQuery(List<PeerID> peers, ResolverQueryMsg query, int threshold) { if (query.getHopCount() > 2) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("[{0} / {1}] hopCount exceeded ({2}) not forwarding query.", group.getPeerGroupName(), handlername, query.getHopCount())); } // query has been forwarded too many times return; } if (peers.size() <= threshold) { forwardQuery(peers, query); } else { // pick some random entries out of the list List<PeerID> newPeers = randomResult(peers, threshold); forwardQuery(newPeers, query); } } /** * returns a random List(threshold) from a given list * * @param result starting set * @param threshold sub-set desired * @return sub-list of result */ protected List<PeerID> randomResult(List<PeerID> result, int threshold) { if (threshold < result.size()) { List<PeerID> res = new ArrayList<PeerID>(threshold); for (int i = 0; i < threshold; i++) { int rand = random.nextInt(result.size()); res.add(result.get(rand)); result.remove(rand); } return res; } return result; } /** * Given an expression return a peer from the list peers in the peerview * this function is used to to give a replication point, and entry point * to query on a pipe * * @param expression expression to derive the mapping from * @return The replicaPeer value */ public PeerID getReplicaPeer(String expression) { PeerID pid; List<PeerID> rpv = getGlobalPeerView(); if (rpv.size() >= RPV_REPLICATION_THRESHOLD) { BigInteger digest; synchronized (jxtaHash) { jxtaHash.update(expression); digest = jxtaHash.getDigestInteger().abs(); } BigInteger sizeOfSpace = java.math.BigInteger.valueOf(rpv.size()); BigInteger sizeOfHashSpace = BigInteger.ONE.shiftLeft(8 * digest.toByteArray().length); int pos = (digest.multiply(sizeOfSpace)).divide(sizeOfHashSpace).intValue(); pid = rpv.get(pos); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("[{0} / {1}] Found a direct peer {2}", group.getPeerGroupName(), handlername, pid)); } return pid; } else { return null; } } /** * forward srdi message to another peer * * @param peerid PeerID to forward query to * @param srcPid The source originator * @param primaryKey primary key * @param secondarykey secondary key * @param value value of the entry * @param expiration expiration in ms */ public void forwardSrdiMessage(PeerID peerid, PeerID srcPid, String primaryKey, String secondarykey, String value, long expiration) { try { SrdiMessageImpl srdi = new SrdiMessageImpl(srcPid, // ttl of 0, avoids additional replication 0, primaryKey, secondarykey, value, expiration); ResolverSrdiMsgImpl resSrdi = new ResolverSrdiMsgImpl(handlername, credential, srdi.toString()); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("[{0} / {1}] Forwarding a SRDI messsage of type {2} to {3}", group.getPeerGroupName(), handlername, primaryKey, peerid)); } resolver.sendSrdi(peerid.toString(), resSrdi); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failed forwarding SRDI Message", e); } } } /** * {@inheritDoc} */ @SuppressWarnings("fallthrough") public void rendezvousEvent(RendezvousEvent event) { int theEventType = event.getType(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(MessageFormat.format("[{0} / {1}] Processing {2}", group.getPeerGroupName(), handlername, event)); } switch (theEventType) { case RendezvousEvent.RDVCONNECT: synchronized (rdvEventLock) { // wake up the publish thread now. rdvEventLock.notify(); } /* * FALLSTHRU */ case RendezvousEvent.RDVRECONNECT: // No need to wake up the publish thread; reconnect should not force indices to be published. break; case RendezvousEvent.CLIENTCONNECT: case RendezvousEvent.CLIENTRECONNECT: case RendezvousEvent.BECAMERDV: case RendezvousEvent.BECAMEEDGE: // XXX 20031110 bondolo perhaps becoming edge one should cause it to wake up so that run() switch to // don't do anything. break; case RendezvousEvent.RDVFAILED: case RendezvousEvent.RDVDISCONNECT: republishSignal.set(true); break; case RendezvousEvent.CLIENTFAILED: case RendezvousEvent.CLIENTDISCONNECT: // we should flush the cache for the peer synchronized (rdvEventLock) { if (group.isRendezvous() && (srdiIndex != null)) { srdiIndex.remove((PeerID) event.getPeerID()); } } break; default: if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning( MessageFormat.format("[{0} / {1}] Unexpected RDV event {2}", group.getPeerGroupName(), handlername, event)); } break; } } /** * {@inheritDoc} * <p/> * Main processing method for the SRDI Worker thread * Send all entries, wait for pushInterval, then send deltas */ public void run() { boolean waitingForRdv; boolean republish = true; try { while (!stop) { // upon connection we will have to republish republish |= republishSignal.compareAndSet(true, false); waitingForRdv = group.isRendezvous() || !group.getRendezVousService().isConnectedToRendezVous() || group.getRendezVousService().getRendezVousStatus() == RendezVousStatus.ADHOC; if (!waitingForRdv) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("[" + group.getPeerGroupName() + " / " + handlername + "] Pushing " + (republish ? "all entries" : "deltas")); } srdiService.pushEntries(republish); republish = false; } synchronized (rdvEventLock) { try { rdvEventLock.wait(waitingForRdv ? connectPollInterval : pushInterval); } catch (InterruptedException e) { Thread.interrupted(); } } } } catch (Throwable all) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Uncaught Throwable in " + Thread.currentThread().getName() + "[" + group.getPeerGroupName() + " / " + handlername + "]",all); } } } /** * get the global peerview as the rendezvous service only returns * the peerview without the local RDV peer. We need this * consistent view for the SRDI index if not each RDV will have a * different peerview, off setting the index even when the peerview * is stable * * @return the sorted list */ public Vector<PeerID> getGlobalPeerView() { Vector<PeerID> global = new Vector<PeerID>(); SortedSet<String> set = new TreeSet<String>(); try { // get the local peerview List<RdvAdvertisement> rpv = group.getRendezVousService().getLocalWalkView(); for (RdvAdvertisement padv : rpv) { set.add(padv.getPeerID().toString()); } // add myself set.add(group.getPeerID().toString()); // produce a vector of Peer IDs for (String aSet : set) { try { PeerID peerID = (PeerID) IDFactory.fromURI(new URI(aSet)); global.add(peerID); } catch (URISyntaxException badID) { throw new IllegalArgumentException("Bad PeerID ID in advertisement"); } catch (ClassCastException badID) { throw new IllegalArgumentException("ID was not a peerID"); } } } catch (Exception ex) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Failure generating the global view", ex); } } return global; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -