📄 pastimpl.java
字号:
CancellableTask timer = (CancellableTask) timers.remove(new Integer(uid)); if (timer != null) { timer.cancel(); } return (Continuation) outstanding.remove(new Integer(uid)); } /** * Handles the response message from a request. * * @param message The message that arrived */ private void handleResponse(PastMessage message) { if (logger.level <= Logger.FINE) { logger.log("handling reponse message " + message + " from the request"); } Continuation command = removePending(message.getUID()); if (command != null) { message.returnResponse(command, environment, instance); } } /** * Method which inserts the given object into the cache * * @param content The content to cache */ private void cache(final PastContent content) { cache(content, new ListenerContinuation("Caching of " + content, environment)); } /** * Method which inserts the given object into the cache * * @param content The content to cache * @param command The command to run once done */ public void cache(final PastContent content, final Continuation command) { if (logger.level <= Logger.FINER) { logger.log("Inserting PastContent object " + content + " into cache"); } if ((content != null) && (!content.isMutable())) { storage.cache(content.getId(), null, content, command); } else { command.receiveResult(new Boolean(true)); } } /** * Internal method which actually performs an insert for a given object. Here * so that subclasses can override the types of insert messages which are sent * across the wire. * * @param builder The object which builds the messages * @param command The command to call once done * @param id DESCRIBE THE PARAMETER * @param useSocket DESCRIBE THE PARAMETER */ protected void doInsert(final Id id, final MessageBuilder builder, Continuation command, final boolean useSocket) { // first, we get all of the replicas for this id getHandles(id, replicationFactor + 1, new StandardContinuation(command) { public void receiveResult(Object o) { NodeHandleSet replicas = (NodeHandleSet) o; if (logger.level <= Logger.FINER) { logger.log("Received replicas " + replicas + " for id " + id); } // then we send inserts to each replica and wait for at least // threshold * num to return successfully MultiContinuation multi = new MultiContinuation(parent, replicas.size()) { public boolean isDone() throws Exception { int numSuccess = 0; for (int i = 0; i < haveResult.length; i++) { if ((haveResult[i]) && (Boolean.TRUE.equals(result[i]))) { numSuccess++; } } if (numSuccess >= (SUCCESSFUL_INSERT_THRESHOLD * haveResult.length)) { return true; } if (super.isDone()) { for (int i = 0; i < result.length; i++) { if (result[i] instanceof Exception) { if (logger.level <= Logger.WARNING) { logger.logException("result[" + i + "]:", (Exception) result[i]); } } } throw new PastException("Had only " + numSuccess + " successful inserts out of " + result.length + " - aborting."); } return false; } public Object getResult() { Boolean[] b = new Boolean[result.length]; for (int i = 0; i < b.length; i++) { b[i] = new Boolean((result[i] == null) || Boolean.TRUE.equals(result[i])); } return b; } }; for (int i = 0; i < replicas.size(); i++) { NodeHandle handle = replicas.getHandle(i); PastMessage m = builder.buildMessage(); Continuation c = new NamedContinuation("InsertMessage to " + replicas.getHandle(i) + " for " + id, multi.getSubContinuation(i)); if (useSocket) { sendViaSocket(handle, m, c); } else { sendRequest(handle, m, c); } } } }); } // ----- PAST METHODS ----- /** * Inserts an object with the given ID into this instance of Past. * Asynchronously returns a PastException to command, if the operation was * unsuccessful. If the operation was successful, a Boolean[] is returned * representing the responses from each of the replicas which inserted the * object. * * @param obj the object to be inserted * @param command Command to be performed when the result is received */ public void insert(final PastContent obj, final Continuation command) { if (logger.level <= Logger.FINER) { logger.log("Inserting the object " + obj + " with the id " + obj.getId()); } if (logger.level <= Logger.FINEST) { logger.log(" Inserting data of class " + obj.getClass().getName() + " under " + obj.getId().toStringFull()); } doInsert(obj.getId(), new MessageBuilder() { public PastMessage buildMessage() { return new InsertMessage(getUID(), obj, getLocalNodeHandle(), obj.getId()); } }, new StandardContinuation(command) { public void receiveResult(final Object array) { cache(obj, new SimpleContinuation() { public void receiveResult(Object o) { parent.receiveResult(array); } }); } }, socketStrategy.sendAlongSocket(SocketStrategy.TYPE_INSERT, obj)); } /** * Retrieves the object stored in this instance of Past with the given ID. * Asynchronously returns a PastContent object as the result to the provided * Continuation, or a PastException. This method is provided for convenience; * its effect is identical to a lookupHandles() and a subsequent fetch() to * the handle that is nearest in the network. The client must authenticate the * object. In case of failure, an alternate replica of the object can be * obtained via lookupHandles() and fetch(). This method is not safe if the * object is immutable and storage nodes are not trusted. In this case, * clients should used the lookUpHandles method to obtains the handles of all * primary replicas and determine which replica is fresh in an * application-specific manner. * * @param id the key to be queried * @param command Command to be performed when the result is received */ public void lookup(final Id id, final Continuation command) { lookup(id, true, command); } /** * Method which performs the same as lookup(), but allows the callee to * specify if the data should be cached. * * @param id the key to be queried * @param cache Whether or not the data should be cached * @param command Command to be performed when the result is received */ public void lookup(final Id id, final boolean cache, final Continuation command) { if (logger.level <= Logger.FINER) { logger.log(" Performing lookup on " + id.toStringFull()); } storage.getObject(id, new StandardContinuation(command) { public void receiveResult(Object o) { if (o != null) { command.receiveResult(o); } else { // send the request across the wire, and see if the result is null or not sendRequest(id, new LookupMessage(getUID(), id, getLocalNodeHandle(), id), new NamedContinuation("LookupMessage for " + id, this) { public void receiveResult(final Object o) { // if we have an object, we return it // otherwise, we must check all replicas in order to make sure that // the object doesn't exist anywhere if (o != null) { // lastly, try and cache object locally for future use if (cache) { cache((PastContent) o, new SimpleContinuation() { public void receiveResult(Object object) { command.receiveResult(o); } }); } else { command.receiveResult(o); } } else { lookupHandles(id, replicationFactor + 1, new Continuation() { public void receiveResult(Object o) { PastContentHandle[] handles = (PastContentHandle[]) o; for (int i = 0; i < handles.length; i++) { if (handles[i] != null) { fetch(handles[i], new StandardContinuation(parent) { public void receiveResult(final Object o) { // lastly, try and cache object locally for future use if (cache) { cache((PastContent) o, new SimpleContinuation() { public void receiveResult(Object object) { command.receiveResult(o); } }); } else { command.receiveResult(o); } } }); return; } } // there were no replicas of the object command.receiveResult(null); } public void receiveException(Exception e) { command.receiveException(e); } }); } } public void receiveException(Exception e) { // If the lookup message failed , we then try to fetch all of the handles, just // in case. This may fail too, but at least we tried. receiveResult(null); } }); } } }); } /** * Retrieves the handles of up to max replicas of the object stored in this * instance of Past with the given ID. Asynchronously returns an array of * PastContentHandles as the result to the provided Continuation, or a * PastException. Each replica handle is obtained from a different primary * storage root for the the given key. If max exceeds the replication factor r * of this Past instance, only r replicas are returned. This method will * return a PastContentHandle[] array containing all of the handles. * * @param id the key to be queried * @param max the maximal number of replicas requested * @param command Command to be performed when the result is received */ public void lookupHandles(final Id id, int max, final Continuation command) { if (logger.level <= Logger.FINE) { logger.log("Retrieving handles of up to " + max + " replicas of the object stored in Past with id " + id); } if (logger.level <= Logger.FINER) { logger.log("Fetching up to " + max + " handles of " + id.toStringFull()); } getHandles(id, max, new StandardContinuation(command) { public void receiveResult(Object o) { NodeHandleSet replicas = (NodeHandleSet) o; if (logger.level <= Logger.FINER) { logger.log("Receiving replicas " + replicas + " for lookup Id " + id); } MultiContinuation multi = new MultiContinuation(parent, replicas.size()) { public Object getResult() { PastContentHandle[] p = new PastContentHandle[result.length]; for (int i = 0; i < result.length; i++) { if (result[i] instanceof PastContentHandle) { p[i] = (PastContentHandle) result[i]; } } return p; } }; for (int i = 0; i < replicas.size(); i++) { lookupHandle(id, replicas.getHandle(i), multi.getSubContinuation(i)); } } }); } /** * Retrieves the handle for the given object stored on the requested node. * Asynchronously returns a PostContentHandle (or null) to the provided * continuation. * * @param id the key to be queried * @param handle The node on which the handle is requested * @param command Command to be performed when the result is received */ public void lookupHandle(Id id, NodeHandle handle, Continuation command) { if (logger.level <= Logger.FINE) { logger.log("Retrieving handle for id " + id + " from node " + handle); } sendRequest(handle, new FetchHandleMessage(getUID(), id, getLocalNodeHandle(), handle.getId()), new NamedContinuation("FetchHandleMessage to " + handle + " for " + id, command)); } /** * Retrieves the object associated with a given content handle. Asynchronously * returns a PastContent object as the result to the provided Continuation, or * a PastException. The client must authenticate the object. In case of * failure, an alternate replica can be obtained using a different handle * obtained via lookupHandles(). * * @param command Command to be performed when the result is received * @param handle DESCRIBE THE PARAMETER */ public void fetch(PastContentHandle handle, Continuation command) { if (logger.level <= Logger.FINE) { logger.log("Retrieving object associated with content handle " + handle); } if (logger.level <= Logger.FINER) { logger.log("Fetching object under id " + handle.getId().toStringFull() + " on " + handle.getNodeHandle()); } NodeHandle han = handle.getNodeHandle(); sendRequest(han, new FetchMessage(getUID(), handle, getLocalNodeHandle(), han.getId()), new NamedContinuation("FetchMessage to " + handle.getNodeHandle() + " for " + handle.getId(), command)); } // ----- COMMON API METHODS ----- /** * This method is invoked on applications when the underlying node is about to * forward the given message with the provided target to the specified next * hop. Applications can change the contents of the message, specify a * different nextHop (through re-routing), or completely terminate the * message. * * @param message The message being sent, containing an internal message along * with a destination key and nodeHandle next hop. * @return Whether or not to forward the message further */ public boolean forward(final RouteMessage message) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -