📄 gcpastimpl.java
字号:
logger.log("REFRESH: ON PROCESSING THREAD!"); } for (int i = 0; i < array.length; i++) { GCId id = (GCId) array[i]; NodeHandleSet replicas = endpoint.replicaSet(id.getId(), replicationFactor + 1, set.getHandle(set.size() - 1), set); // if we have all of the replicas, go ahead and refresh this item if ((replicas != null) && ((replicas.size() == set.size()) || (replicas.size() == replicationFactor + 1))) { for (int j = 0; j < replicas.size(); j++) { map.addReplica(replicas.getHandle(j), id); } refreshed++; ids.removeId(id); } } if (logger.level <= Logger.FINE) { logger.log("REFRESH: DONE WITH PROCESSING THREAD - MOVING TO NORMAL THREAD!"); } return null; } }, new StandardContinuation(parent) { public void receiveResult(Object o) { if (logger.level <= Logger.FINE) { logger.log("REFRESH: BACK ON NORMAL THREAD!"); } final Iterator iterator = map.getReplicas(); Continuation send = new StandardContinuation(parent) { public void receiveResult(Object o) { if (iterator.hasNext()) { NodeHandle next = (NodeHandle) iterator.next(); GCIdSet ids = map.getIds(next); if (logger.level <= Logger.FINE) { logger.log("REFRESH: SENDING REQUEST TO " + next + " FOR IDSET " + ids); } sendRequest(next, new GCRefreshMessage(getUID(), ids, getLocalNodeHandle(), next.getId()), new NamedContinuation("GCRefresh to " + next, this)); } else { if (logger.level <= Logger.FINE) { logger.log("REFRESH: DONE SENDING REQUESTS, RECURSING"); } refresh(ids, parent); } } public void receiveException(Exception e) { if (logger.level <= Logger.FINE) { logger.log("GOT EXCEPTION " + e + " REFRESHING ITEMS - CONTINUING"); } receiveResult(null); } }; send.receiveResult(null); } }); } }); } /** * This method is invoked on applications when the underlying node is about to * forward the given message with the provided target to the specified next * hop. Applications can change the contents of the message, specify a * different nextHop (through re-routing), or completely terminate the * message. * * @param message The message being sent, containing an internal message along * with a destination key and nodeHandle next hop. * @return Whether or not to forward the message further */ public boolean forward(final RouteMessage message) { try { if (message.getMessage(endpoint.getDeserializer()) instanceof GCLookupHandlesMessage) { return true; } else { return super.forward(message); } } catch (IOException ioe) { throw new RuntimeException(ioe); } } /** * This method is called on the application at the destination node for the * given id. * * @param id The destination id of the message * @param message The message being sent */ public void deliver(Id id, Message message) { final PastMessage msg = (PastMessage) message; if (msg.isResponse()) { super.deliver(id, message); } else { if (msg instanceof GCInsertMessage) { final GCInsertMessage imsg = (GCInsertMessage) msg; inserts++; // make sure the policy allows the insert if (policy.allowInsert(imsg.getContent())) { Id theId = imsg.getContent().getId(); if (theId == null) { if (logger.level <= Logger.SEVERE) { logger.log("Error: null Id from " + imsg.getContent() + " from " + imsg + " in " + this); } } storage.getObject(theId, new StandardContinuation(getResponseContinuation(msg)) { public void receiveResult(Object o) { try { // allow the object to check the insert, and then insert the data GCPastContent content = (GCPastContent) imsg.getContent().checkInsert(imsg.getContent().getId(), (PastContent) o); storage.store(content.getId(), content.getMetadata(imsg.getExpiration()), content, parent); } catch (PastException e) { parent.receiveException(e); } } }); } else { getResponseContinuation(msg).receiveResult(new Boolean(false)); } } else if (msg instanceof GCRefreshMessage) { final GCRefreshMessage rmsg = (GCRefreshMessage) msg; final Iterator i = Arrays.asList(rmsg.getKeys()).iterator(); final Vector result = new Vector(); other += rmsg.getKeys().length; StandardContinuation process = new StandardContinuation(getResponseContinuation(msg)) { public void receiveResult(Object o) { if (o != null) { result.addElement(o); } if (i.hasNext()) { final GCId id = (GCId) i.next(); /* * skip the object if we don't have it yet */ if (storage.exists(id.getId())) { GCPastMetadata metadata = (GCPastMetadata) storage.getMetadata(id.getId()); if (metadata != null) { /* * only allow the lifetime to be extended, otherwise skip */ if (metadata.getExpiration() < id.getExpiration()) { storage.setMetadata(id.getId(), metadata.setExpiration(id.getExpiration()), this); } else { receiveResult(Boolean.FALSE); } } else { storage.getObject(id.getId(), new StandardContinuation(this) { public void receiveResult(Object o) { storage.setMetadata(id.getId(), ((GCPastContent) o).getMetadata(id.getExpiration()), parent); } }); } } else { /* * but first check and see if it's in the trash, so we can uncollect it */ if (trash != null) { trash.getObject(id.getId(), new StandardContinuation(this) { public void receiveResult(Object o) { if ((o != null) && (o instanceof GCPastContent)) { if (logger.level <= Logger.FINE) { logger.log( "GCREFRESH: Restoring object " + id + " from trash!"); } GCPastContent content = (GCPastContent) o; storage.store(id.getId(), content.getMetadata(id.getExpiration()), content, new StandardContinuation(parent) { public void receiveResult(Object o) { trash.unstore(id.getId(), parent); } }); } else { parent.receiveResult(Boolean.FALSE); } } }); } else { receiveResult(Boolean.FALSE); } } } else { parent.receiveResult(result.toArray(new Boolean[0])); } } }; process.receiveResult(null); } else if (msg instanceof GCLookupHandlesMessage) { GCLookupHandlesMessage lmsg = (GCLookupHandlesMessage) msg; NodeHandleSet set = endpoint.neighborSet(lmsg.getMax()); set.removeHandle(getLocalNodeHandle().getId()); set.putHandle(getLocalNodeHandle()); if (logger.level <= Logger.FINER) { logger.log("Returning neighbor set " + set + " for lookup handles of id " + lmsg.getId() + " max " + lmsg.getMax() + " at " + endpoint.getId()); } getResponseContinuation(msg).receiveResult(set); } else if (msg instanceof GCCollectMessage) { // get all ids which expiration before now collect(storage.scanMetadataValuesHead(new GCPastMetadata(environment.getTimeSource().currentTimeMillis())), new ListenerContinuation("Removal of expired ids", environment) { public void receiveResult(Object o) { if (environment.getTimeSource().currentTimeMillis() > DEFAULT_EXPIRATION) { collect(storage.scanMetadataValuesNull(), new ListenerContinuation("Removal of default expired ids", environment)); } } }); } else if (msg instanceof FetchHandleMessage) { final FetchHandleMessage fmsg = (FetchHandleMessage) msg; fetchHandles++; storage.getObject(fmsg.getId(), new StandardContinuation(getResponseContinuation(msg)) { public void receiveResult(Object o) { GCPastContent content = (GCPastContent) o; if (content != null) { if (logger.level <= Logger.FINE) { logger.log("Retrieved data for fetch handles of id " + fmsg.getId()); } GCPastMetadata metadata = (GCPastMetadata) storage.getMetadata(fmsg.getId()); if (metadata != null) { parent.receiveResult(content.getHandle(GCPastImpl.this, metadata.getExpiration())); } else { parent.receiveResult(content.getHandle(GCPastImpl.this, DEFAULT_EXPIRATION)); } } else { parent.receiveResult(null); } } }); } else { super.deliver(id, message); } } } /** * Internal method which collects all of the objects in the given set * * @param command The command to call once done * @param map DESCRIBE THE PARAMETER */ protected void collect(SortedMap map, Continuation command) { final Iterator i = map.keySet().iterator(); Continuation remove = new StandardContinuation(command) { public void receiveResult(Object o) { if (i.hasNext()) { final Id gid = (Id) i.next(); GCPastMetadata metadata = (GCPastMetadata) storage.getMetadata(gid); collected++; if (trash != null) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -