📄 pastimpl.java
字号:
Message internal; try { internal = message.getMessage(endpoint.getDeserializer()); } catch (IOException ioe) { throw new RuntimeException(ioe); } if (internal instanceof LookupMessage) { final LookupMessage lmsg = (LookupMessage) internal; Id id = lmsg.getId(); // if it is a request, look in the cache if (!lmsg.isResponse()) { if (logger.level <= Logger.FINER) { logger.log("Lookup message " + lmsg + " is a request; look in the cache"); } if (storage.exists(id)) { // deliver the message, which will do what we want if (logger.level <= Logger.FINE) { logger.log("Request for " + id + " satisfied locally - responding"); } deliver(endpoint.getId(), lmsg); return false; } } } else if (internal instanceof LookupHandlesMessage) { LookupHandlesMessage lmsg = (LookupHandlesMessage) internal; if (!lmsg.isResponse()) { if (endpoint.replicaSet(lmsg.getId(), lmsg.getMax()).size() == lmsg.getMax()) { if (logger.level <= Logger.FINE) { logger.log("Hijacking lookup handles request for " + lmsg.getId()); } deliver(endpoint.getId(), lmsg); return false; } } } return true; } /** * This method is called on the application at the destination node for the * given id. * * @param id The destination id of the message * @param message The message being sent */ public void deliver(Id id, Message message) { final PastMessage msg = (PastMessage) message; if (msg.isResponse()) { handleResponse((PastMessage) message); } else { if (logger.level <= Logger.INFO) { logger.log("Received message " + message + " with destination " + id); } if (msg instanceof InsertMessage) { final InsertMessage imsg = (InsertMessage) msg; // make sure the policy allows the insert if (policy.allowInsert(imsg.getContent())) { inserts++; storage.getObject(imsg.getContent().getId(), new StandardContinuation(getResponseContinuation(msg)) { public void receiveResult(Object o) { try { // allow the object to check the insert, and then insert the data PastContent content = imsg.getContent().checkInsert(imsg.getContent().getId(), (PastContent) o); storage.store(imsg.getContent().getId(), null, content, parent); } catch (PastException e) { parent.receiveException(e); } } }); } else { getResponseContinuation(msg).receiveResult(new Boolean(false)); } } else if (msg instanceof LookupMessage) { final LookupMessage lmsg = (LookupMessage) msg; lookups++; // if the data is here, we send the reply, as well as push a cached copy // back to the previous node storage.getObject(lmsg.getId(), new StandardContinuation(getResponseContinuation(lmsg)) { public void receiveResult(Object o) { if (logger.level <= Logger.FINE) { logger.log("Received object " + o + " for id " + lmsg.getId()); } // send result back parent.receiveResult(o); // if possible, pushed copy into previous hop cache if ((lmsg.getPreviousNodeHandle() != null) && (o != null) && (!((PastContent) o).isMutable())) { NodeHandle handle = lmsg.getPreviousNodeHandle(); if (logger.level <= Logger.FINE) { logger.log("Pushing cached copy of " + ((PastContent) o).getId() + " to " + handle); } CacheMessage cmsg = new CacheMessage(getUID(), (PastContent) o, getLocalNodeHandle(), handle.getId()); //endpoint.route(null, cmsg, handle); } } }); } else if (msg instanceof LookupHandlesMessage) { LookupHandlesMessage lmsg = (LookupHandlesMessage) msg; NodeHandleSet set = endpoint.replicaSet(lmsg.getId(), lmsg.getMax()); if (logger.level <= Logger.FINER) { logger.log("Returning replica set " + set + " for lookup handles of id " + lmsg.getId() + " max " + lmsg.getMax() + " at " + endpoint.getId()); } getResponseContinuation(msg).receiveResult(set); } else if (msg instanceof FetchMessage) { FetchMessage fmsg = (FetchMessage) msg; lookups++; Continuation c;// c = getResponseContinuation(msg); c = getFetchResponseContinuation(msg); // has to be special to determine how to send the message storage.getObject(fmsg.getHandle().getId(), c); } else if (msg instanceof FetchHandleMessage) { final FetchHandleMessage fmsg = (FetchHandleMessage) msg; fetchHandles++; storage.getObject(fmsg.getId(), new StandardContinuation(getResponseContinuation(msg)) { public void receiveResult(Object o) { PastContent content = (PastContent) o; if (content != null) { if (logger.level <= Logger.FINE) { logger.log("Retrieved data for fetch handles of id " + fmsg.getId()); } parent.receiveResult(content.getHandle(PastImpl.this)); } else { parent.receiveResult(null); } } }); } else if (msg instanceof CacheMessage) { cache(((CacheMessage) msg).getContent()); } else { if (logger.level <= Logger.SEVERE) { logger.log("ERROR - Received message " + msg + "of unknown type."); } } } } /** * This method is invoked to inform the application that the given node has * either joined or left the neighbor set of the local node, as the set would * be returned by the neighborSet call. * * @param handle The handle that has joined/left * @param joined Whether the node has joined or left */ public void update(NodeHandle handle, boolean joined) { } // ----- REPLICATION MANAGER METHODS ----- /** * This upcall is invoked to tell the client to fetch the given id, and to * call the given command with the boolean result once the fetch is completed. * The client *MUST* call the command at some point in the future, as the * manager waits for the command to return before continuing. * * @param id The id to fetch * @param hint DESCRIBE THE PARAMETER * @param command DESCRIBE THE PARAMETER */ public void fetch(final Id id, NodeHandle hint, Continuation command) { if (logger.level <= Logger.FINER) { logger.log("Sending out replication fetch request for the id " + id); } policy.fetch(id, hint, backup, this, new StandardContinuation(command) { public void receiveResult(Object o) { if (o == null) { if (logger.level <= Logger.WARNING) { logger.log("Could not fetch id " + id + " - policy returned null in namespace " + instance); } parent.receiveResult(new Boolean(false)); } else { if (logger.level <= Logger.FINEST) { logger.log("inserting replica of id " + id); } if (!(o instanceof PastContent)) { if (logger.level <= Logger.WARNING) { logger.log("ERROR! Not PastContent " + o.getClass().getName() + " " + o); } } storage.getStorage().store(((PastContent) o).getId(), null, (PastContent) o, parent); } } }); } /** * This upcall is to notify the client that the given id can be safely removed * from the storage. The client may choose to perform advanced behavior, such * as caching the object, or may simply delete it. * * @param id The id to remove * @param command DESCRIBE THE PARAMETER */ public void remove(final Id id, Continuation command) { if (backup != null) { storage.getObject(id, new StandardContinuation(command) { public void receiveResult(Object o) { backup.cache(id, storage.getMetadata(id), (Serializable) o, new StandardContinuation(parent) { public void receiveResult(Object o) { storage.unstore(id, parent); } }); } }); } else { storage.unstore(id, command); } } /** * This upcall should return the set of keys that the application currently * stores in this range. Should return a empty IdSet (not null), in the case * that no keys belong to this range. * * @param range the requested range * @return DESCRIBE THE RETURN VALUE */ public IdSet scan(IdRange range) { return storage.getStorage().scan(range); } /** * This upcall should return the set of keys that the application currently * stores. Should return a empty IdSet (not null), in the case that no keys * belong to this range. * * @return DESCRIBE THE RETURN VALUE */ public IdSet scan() { return storage.getStorage().scan(); } /** * This upcall should return whether or not the given id is currently stored * by the client. * * @param id The id in question * @return Whether or not the id exists */ public boolean exists(Id id) { return storage.getStorage().exists(id); } /** * DESCRIBE THE METHOD * * @param id DESCRIBE THE PARAMETER * @param command DESCRIBE THE PARAMETER */ public void existsInOverlay(Id id, Continuation command) { lookupHandles(id, replicationFactor + 1, new StandardContinuation(command) { public void receiveResult(Object result) { Object results[] = (Object[]) result; for (int i = 0; i < results.length; i++) { if (results[i] instanceof PastContentHandle) { parent.receiveResult(Boolean.TRUE); return; } } parent.receiveResult(Boolean.FALSE); } }); } /** * DESCRIBE THE METHOD * * @param id DESCRIBE THE PARAMETER * @param command DESCRIBE THE PARAMETER */ public void reInsert(Id id, Continuation command) { storage.getObject(id, new StandardContinuation(command) { public void receiveResult(final Object o) { insert((PastContent) o, new StandardContinuation(parent) { public void receiveResult(Object result) { Boolean results[] = (Boolean[]) result; for (int i = 0; i < results.length; i++) { if (results[i].booleanValue()) { parent.receiveResult(Boolean.TRUE); return; } } parent.receiveResult(Boolean.FALSE); } }); } }); } /** * DESCRIBE THE CLASS * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ protected class PastDeserializer implements MessageDeserializer { /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @param type DESCRIBE THE PARAMETER * @param priority DESCRIBE THE PARAMETER * @param sender DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public Message deserialize(InputBuffer buf, short type, byte priority, NodeHandle sender) throws IOException { try { switch (type) { case CacheMessage.TYPE: return CacheMessage.build(buf, endpoint, contentDeserializer); case FetchHandleMessage.TYPE: return FetchHandleMessage.build(buf, endpoint, contentHandleDeserializer); case FetchMessage.TYPE: return FetchMessage.build(buf, endpoint, contentDeserializer, contentHandleDeserializer); case InsertMessage.TYPE: return InsertMessage.build(buf, endpoint, contentDeserializer); case LookupHandlesMessage.TYPE: return LookupHandlesMessage.build(buf, endpoint); case LookupMessage.TYPE: return LookupMessage.build(buf, endpoint, contentDeserializer); } } catch (IOException e) { if (logger.level <= Logger.SEVERE) { logger.log("Exception in deserializer in " + PastImpl.this.endpoint.toString() + ":" + instance + " " + e); } throw e; } throw new IllegalArgumentException("Unknown type:" + type + " in " + PastImpl.this.toString()); } } /** * Class which builds a message * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ public interface MessageBuilder { /** * DESCRIBE THE METHOD * * @return DESCRIBE THE RETURN VALUE */ public PastMessage buildMessage(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -