📄 glacierimpl.java
字号:
FragmentAndManifest fam = (FragmentAndManifest) o; sendMessage( null, new GlacierDataMessage(grm.getUID(), thisKey, fam.fragment, fam.manifest, getLocalNodeHandle(), grm.getSource().getId(), true, tagHandoff), grm.getSource() ); } else { if (logger.level <= Logger.WARNING) { logger.log("Handoff failed; fragment " + thisKey + " not found in fragment store"); } } } public void receiveException(Exception e) { if (logger.level <= Logger.WARNING) { logger.logException("Handoff failed; exception while fetching " + thisKey + ", e=", e); } } }); } } else { if (logger.level <= Logger.FINE) { logger.log("Ignoring fragment " + thisKey + " (haveIt=" + grm.getHaveIt(i) + ", authoritative=" + grm.getAuthoritative(i) + ")"); } } } } else if (o instanceof GlacierDataMessage) { final GlacierDataMessage gdm = (GlacierDataMessage) o; for (int i = 0; i < gdm.numKeys(); i++) { final FragmentKey thisKey = gdm.getKey(i); final Fragment thisFragment = gdm.getFragment(i); final Manifest thisManifest = gdm.getManifest(i); if ((thisFragment != null) && (thisManifest != null)) { if (logger.level <= Logger.INFO) { logger.log("Handoff: Received Fragment+Manifest for " + thisKey); } if (!responsibleRange.containsId(getFragmentLocation(thisKey))) { if (logger.level <= Logger.WARNING) { logger.log("Handoff: Not responsible for " + thisKey + " (at " + getFragmentLocation(thisKey) + ") -- discarding"); } continue; } if (!policy.checkSignature(thisManifest, thisKey.getVersionKey())) { if (logger.level <= Logger.WARNING) { logger.log("Handoff: Manifest is not signed properly"); } continue; } if (!thisManifest.validatesFragment(thisFragment, thisKey.getFragmentID(), environment.getLogManager().getLogger(Manifest.class, instance))) { if (logger.level <= Logger.WARNING) { logger.log("Handoff: Manifest does not validate this fragment"); } continue; } if (!fragmentStorage.exists(thisKey)) { if (logger.level <= Logger.FINE) { logger.log("Handoff: Verified ok. Storing locally."); } FragmentAndManifest fam = new FragmentAndManifest(thisFragment, thisManifest); fragmentStorage.store(thisKey, new FragmentMetadata(thisManifest.getExpiration(), 0, environment.getTimeSource().currentTimeMillis()), fam, new Continuation() { public void receiveResult(Object o) { if (logger.level <= Logger.INFO) { logger.log("Handoff: Stored OK, sending receipt: " + thisKey); } sendMessage( null, new GlacierResponseMessage(gdm.getUID(), thisKey, true, thisManifest.getExpiration(), responsibleRange.containsId(getFragmentLocation(thisKey)), getLocalNodeHandle(), gdm.getSource().getId(), true, tagHandoff), gdm.getSource() ); } public void receiveException(Exception e) { if (logger.level <= Logger.WARNING) { logger.log("Handoff: receiveException(" + e + ") while storing a fragment -- unexpected, ignored (key=" + thisKey + ")"); } } } ); } else { if (logger.level <= Logger.WARNING) { logger.log("Handoff: We already have a fragment with this key! -- sending response"); } sendMessage( null, new GlacierResponseMessage(gdm.getUID(), thisKey, true, thisManifest.getExpiration(), true, getLocalNodeHandle(), gdm.getSource().getId(), true, tagHandoff), gdm.getSource() ); continue; } continue; } else { if (logger.level <= Logger.WARNING) { logger.log("Handoff: Either fragment or manifest are missing!"); } continue; } } } else { if (logger.level <= Logger.WARNING) { logger.log("Unexpected response in handoff continuation: " + o + " -- ignored"); } } } public void receiveException(Exception e) { if (logger.level <= Logger.WARNING) { logger.logException("Exception in handoff continuation: ", e); } } public void timeoutExpired() { nextTimeout = environment.getTimeSource().currentTimeMillis() + jitterTerm(handoffInterval); if (logger.level <= Logger.INFO) { logger.log("Checking fragment storage for fragments to hand off..."); } if (logger.level <= Logger.FINE) { logger.log("Currently responsible for: " + responsibleRange); } Iterator iter = fragmentStorage.scan().getIterator(); Vector handoffs = new Vector(); Id destination = null; while (iter.hasNext()) { FragmentKey fkey = (FragmentKey) iter.next(); Id thisPos = getFragmentLocation(fkey); if (!responsibleRange.containsId(thisPos)) { if (logger.level <= Logger.FINE) { logger.log("Must hand off " + fkey + " @" + thisPos); } handoffs.add(fkey); if (handoffs.size() >= handoffMaxFragments) { if (logger.level <= Logger.FINE) { logger.log("Limit of " + handoffMaxFragments + " reached for handoff"); } break; } if (destination == null) { destination = thisPos; } } } if (destination == null) { if (logger.level <= Logger.FINE) { logger.log("Nothing to hand off -- returning"); } return; } int numHandoffs = Math.min(handoffs.size(), handoffMaxFragments); if (logger.level <= Logger.INFO) { logger.log("Handing off " + numHandoffs + " fragments (out of " + handoffs.size() + ")"); } FragmentKey[] keys = new FragmentKey[numHandoffs]; for (int i = 0; i < numHandoffs; i++) { keys[i] = (FragmentKey) handoffs.elementAt(i); } sendMessage( destination, new GlacierQueryMessage(getMyUID(), keys, getLocalNodeHandle(), destination, tagHandoff), null ); } }); /* * Garbage collection */ addContinuation( new GlacierContinuation() { long nextTimeout; public long getTimeout() { return nextTimeout; } public String toString() { return "Garbage collector"; } public void init() { nextTimeout = environment.getTimeSource().currentTimeMillis() + garbageCollectionInterval; } public void receiveResult(Object o) { if (logger.level <= Logger.WARNING) { logger.log("GC received object: " + o); } } public void receiveException(Exception e) { if (logger.level <= Logger.WARNING) { logger.logException("GC received exception: ", e); } } public void timeoutExpired() { nextTimeout = environment.getTimeSource().currentTimeMillis() + garbageCollectionInterval; final long now = environment.getTimeSource().currentTimeMillis(); IdSet fragments = fragmentStorage.scan(); int doneSoFar = 0; int candidates = 0; if (logger.level <= Logger.INFO) { logger.log("Garbage collection started at " + now + ", scanning " + fragments.numElements() + " fragment(s)..."); } Iterator iter = fragments.getIterator(); while (iter.hasNext()) { final Id thisKey = (Id) iter.next(); final FragmentMetadata metadata = (FragmentMetadata) fragmentStorage.getMetadata(thisKey); if (metadata != null) { if (metadata.getCurrentExpiration() < now) { candidates++; if (doneSoFar < garbageCollectionMaxFragmentsPerRun) { doneSoFar++; deleteFragment(thisKey, new Continuation() { public void receiveResult(Object o) { if (logger.level <= Logger.INFO) { logger.log("GC collected " + thisKey.toStringFull() + ", expired " + (now - metadata.getCurrentExpiration()) + " msec ago"); } } public void receiveException(Exception e) { if (logger.level <= Logger.FINE) { logger.log("GC cannot collect " + thisKey.toStringFull()); } } }); } } } else { if (logger.level <= Logger.WARNING) { logger.log("GC cannot read metadata in object " + thisKey.toStringFull() + ", storage returned null"); } } } if (logger.level <= Logger.INFO) { logger.log("Garbage collection completed at " + environment.getTimeSource().currentTimeMillis()); } if (logger.level <= Logger.INFO) { logger.log("Found " + candidates + " candidate(s), collected " + doneSoFar); } } }); /* * Local scan */ addContinuation( new GlacierContinuation() { long nextTimeout; public long getTimeout() { return nextTimeout; } public String toString() { return "Local scan"; } public void init() { nextTimeout = environment.getTimeSource().currentTimeMillis() + localScanInterval; } public void receiveResult(Object o) { if (logger.level <= Logger.WARNING) { logger.log("Local scan received object: " + o); } } public void receiveException(Exception e) { if (logger.level <= Logger.WARNING) { logger.logException("Local scan received exception: ", e); } } public void timeoutExpired() { nextTimeout = environment.getTimeSource().currentTimeMillis() + jitterTerm(localScanInterval); final IdSet fragments = fragmentStorage.scan(); final long now = environment.getTimeSource().currentTimeMillis(); java.util.TreeSet queries = new java.util.TreeSet(); if (logger.level <= Logger.INFO) { logger.log("Performing local scan over " + fragments.numElements() + " fragment(s)..."); } Iterator iter = fragments.getIterator(); while (iter.hasNext()) { final FragmentKey thisKey = (FragmentKey) iter.next(); FragmentMetadata metadata = (FragmentMetadata) fragmentStorage.getMetadata(thisKey); if ((metadata != null) && (metadata.currentExpirationDate >= now)) { final Id thisObjectKey = thisKey.getVersionKey().getId(); final long thisVersion = thisKey.getVersionKey().getVersion(); final int thisFragmentID = thisKey.getFragmentID(); final int fidLeft = (thisFragmentID + numFragments - 1) % numFragments; final int fidRight = (thisFragmentID + 1) % numFragments; if (responsibleRange.containsId(getFragmentLocation(thisObjectKey, fidLeft, thisVersion))) { if (!fragments.isMemberId(thisKey.getPeerKey(fidLeft))) { if (logger.level <= Logger.FINER) { logger.log("Missing: " + thisKey + " L=" + fidLeft); } queries.add(thisKey.getVersionKey()); } } if (responsibleRange.containsId(getFragmentLocation(thisObjectKey, fidRight, thisVersion))) { if (!fragments.isMemberId(thisKey.getPeerKey(fidRight))) { if (logger.level <= Logger.FINER) { logger.log("Missing: " + thisKey + " R=" + fidRight); } queries.add(thisKey.getVersionKey()); } } } else { if (logger.level <= Logger.FINER) { logger.log("Expired, ignoring in local scan: " + thisKey); } } } if (!queries.isEmpty()) { if (logger.level <= Logger.INFO) { logger.log("Local scan completed; " + queries.size() + " objects incomplete in local store"); } iter = queries.iterator(); int queriesSent = 0; while (iter.hasNext() && (queriesSent < localScanMaxFragmentsPerRun)) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -