⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pastimpl.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    CancellableTask timer = (CancellableTask) timers.remove(new Integer(uid));    if (timer != null) {      timer.cancel();    }    return (Continuation) outstanding.remove(new Integer(uid));  }  /**   * Handles the response message from a request.   *   * @param message The message that arrived   */  private void handleResponse(PastMessage message) {    if (logger.level <= Logger.FINE) {      logger.log("handling reponse message " + message + " from the request");    }    Continuation command = removePending(message.getUID());    if (command != null) {      message.returnResponse(command, environment, instance);    }  }  /**   * Method which inserts the given object into the cache   *   * @param content The content to cache   */  private void cache(final PastContent content) {    cache(content, new ListenerContinuation("Caching of " + content, environment));  }  /**   * Method which inserts the given object into the cache   *   * @param content The content to cache   * @param command The command to run once done   */  public void cache(final PastContent content, final Continuation command) {    if (logger.level <= Logger.FINER) {      logger.log("Inserting PastContent object " + content + " into cache");    }    if ((content != null) && (!content.isMutable())) {      storage.cache(content.getId(), null, content, command);    } else {      command.receiveResult(new Boolean(true));    }  }  /**   * Internal method which actually performs an insert for a given object. Here   * so that subclasses can override the types of insert messages which are sent   * across the wire.   *   * @param builder The object which builds the messages   * @param command The command to call once done   * @param id DESCRIBE THE PARAMETER   * @param useSocket DESCRIBE THE PARAMETER   */  protected void doInsert(final Id id, final MessageBuilder builder, Continuation command, final boolean useSocket) {    // first, we get all of the replicas for this id    getHandles(id, replicationFactor + 1,      new StandardContinuation(command) {        public void receiveResult(Object o) {          NodeHandleSet replicas = (NodeHandleSet) o;          if (logger.level <= Logger.FINER) {            logger.log("Received replicas " + replicas + " for id " + id);          }          // then we send inserts to each replica and wait for at least          // threshold * num to return successfully          MultiContinuation multi =            new MultiContinuation(parent, replicas.size()) {              public boolean isDone() throws Exception {                int numSuccess = 0;                for (int i = 0; i < haveResult.length; i++) {                  if ((haveResult[i]) && (Boolean.TRUE.equals(result[i]))) {                    numSuccess++;                  }                }                if (numSuccess >= (SUCCESSFUL_INSERT_THRESHOLD * haveResult.length)) {                  return true;                }                if (super.isDone()) {                  for (int i = 0; i < result.length; i++) {                    if (result[i] instanceof Exception) {                      if (logger.level <= Logger.WARNING) {                        logger.logException("result[" + i + "]:", (Exception) result[i]);                      }                    }                  }                  throw new PastException("Had only " + numSuccess + " successful inserts out of " + result.length + " - aborting.");                }                return false;              }              public Object getResult() {                Boolean[] b = new Boolean[result.length];                for (int i = 0; i < b.length; i++) {                  b[i] = new Boolean((result[i] == null) || Boolean.TRUE.equals(result[i]));                }                return b;              }            };          for (int i = 0; i < replicas.size(); i++) {            NodeHandle handle = replicas.getHandle(i);            PastMessage m = builder.buildMessage();            Continuation c = new NamedContinuation("InsertMessage to " + replicas.getHandle(i) + " for " + id, multi.getSubContinuation(i));            if (useSocket) {              sendViaSocket(handle, m, c);            } else {              sendRequest(handle, m, c);            }          }        }      });  }  // ----- PAST METHODS -----  /**   * Inserts an object with the given ID into this instance of Past.   * Asynchronously returns a PastException to command, if the operation was   * unsuccessful. If the operation was successful, a Boolean[] is returned   * representing the responses from each of the replicas which inserted the   * object.   *   * @param obj the object to be inserted   * @param command Command to be performed when the result is received   */  public void insert(final PastContent obj, final Continuation command) {    if (logger.level <= Logger.FINER) {      logger.log("Inserting the object " + obj + " with the id " + obj.getId());    }    if (logger.level <= Logger.FINEST) {      logger.log(" Inserting data of class " + obj.getClass().getName() + " under " + obj.getId().toStringFull());    }    doInsert(obj.getId(),      new MessageBuilder() {        public PastMessage buildMessage() {          return new InsertMessage(getUID(), obj, getLocalNodeHandle(), obj.getId());        }      },      new StandardContinuation(command) {        public void receiveResult(final Object array) {          cache(obj,            new SimpleContinuation() {              public void receiveResult(Object o) {                parent.receiveResult(array);              }            });        }      },      socketStrategy.sendAlongSocket(SocketStrategy.TYPE_INSERT, obj));  }  /**   * Retrieves the object stored in this instance of Past with the given ID.   * Asynchronously returns a PastContent object as the result to the provided   * Continuation, or a PastException. This method is provided for convenience;   * its effect is identical to a lookupHandles() and a subsequent fetch() to   * the handle that is nearest in the network. The client must authenticate the   * object. In case of failure, an alternate replica of the object can be   * obtained via lookupHandles() and fetch(). This method is not safe if the   * object is immutable and storage nodes are not trusted. In this case,   * clients should used the lookUpHandles method to obtains the handles of all   * primary replicas and determine which replica is fresh in an   * application-specific manner.   *   * @param id the key to be queried   * @param command Command to be performed when the result is received   */  public void lookup(final Id id, final Continuation command) {    lookup(id, true, command);  }  /**   * Method which performs the same as lookup(), but allows the callee to   * specify if the data should be cached.   *   * @param id the key to be queried   * @param cache Whether or not the data should be cached   * @param command Command to be performed when the result is received   */  public void lookup(final Id id, final boolean cache, final Continuation command) {    if (logger.level <= Logger.FINER) {      logger.log(" Performing lookup on " + id.toStringFull());    }    storage.getObject(id,      new StandardContinuation(command) {        public void receiveResult(Object o) {          if (o != null) {            command.receiveResult(o);          } else {            // send the request across the wire, and see if the result is null or not            sendRequest(id, new LookupMessage(getUID(), id, getLocalNodeHandle(), id),              new NamedContinuation("LookupMessage for " + id, this) {                public void receiveResult(final Object o) {                  // if we have an object, we return it                  // otherwise, we must check all replicas in order to make sure that                  // the object doesn't exist anywhere                  if (o != null) {                    // lastly, try and cache object locally for future use                    if (cache) {                      cache((PastContent) o,                        new SimpleContinuation() {                          public void receiveResult(Object object) {                            command.receiveResult(o);                          }                        });                    } else {                      command.receiveResult(o);                    }                  } else {                    lookupHandles(id, replicationFactor + 1,                      new Continuation() {                        public void receiveResult(Object o) {                          PastContentHandle[] handles = (PastContentHandle[]) o;                          for (int i = 0; i < handles.length; i++) {                            if (handles[i] != null) {                              fetch(handles[i],                                new StandardContinuation(parent) {                                  public void receiveResult(final Object o) {                                    // lastly, try and cache object locally for future use                                    if (cache) {                                      cache((PastContent) o,                                        new SimpleContinuation() {                                          public void receiveResult(Object object) {                                            command.receiveResult(o);                                          }                                        });                                    } else {                                      command.receiveResult(o);                                    }                                  }                                });                              return;                            }                          }                          // there were no replicas of the object                          command.receiveResult(null);                        }                        public void receiveException(Exception e) {                          command.receiveException(e);                        }                      });                  }                }                public void receiveException(Exception e) {                  // If the lookup message failed , we then try to fetch all of the handles, just                  // in case.  This may fail too, but at least we tried.                  receiveResult(null);                }              });          }        }      });  }  /**   * Retrieves the handles of up to max replicas of the object stored in this   * instance of Past with the given ID. Asynchronously returns an array of   * PastContentHandles as the result to the provided Continuation, or a   * PastException. Each replica handle is obtained from a different primary   * storage root for the the given key. If max exceeds the replication factor r   * of this Past instance, only r replicas are returned. This method will   * return a PastContentHandle[] array containing all of the handles.   *   * @param id the key to be queried   * @param max the maximal number of replicas requested   * @param command Command to be performed when the result is received   */  public void lookupHandles(final Id id, int max, final Continuation command) {    if (logger.level <= Logger.FINE) {      logger.log("Retrieving handles of up to " + max + " replicas of the object stored in Past with id " + id);    }    if (logger.level <= Logger.FINER) {      logger.log("Fetching up to " + max + " handles of " + id.toStringFull());    }    getHandles(id, max,      new StandardContinuation(command) {        public void receiveResult(Object o) {          NodeHandleSet replicas = (NodeHandleSet) o;          if (logger.level <= Logger.FINER) {            logger.log("Receiving replicas " + replicas + " for lookup Id " + id);          }          MultiContinuation multi =            new MultiContinuation(parent, replicas.size()) {              public Object getResult() {                PastContentHandle[] p = new PastContentHandle[result.length];                for (int i = 0; i < result.length; i++) {                  if (result[i] instanceof PastContentHandle) {                    p[i] = (PastContentHandle) result[i];                  }                }                return p;              }            };          for (int i = 0; i < replicas.size(); i++) {            lookupHandle(id, replicas.getHandle(i), multi.getSubContinuation(i));          }        }      });  }  /**   * Retrieves the handle for the given object stored on the requested node.   * Asynchronously returns a PostContentHandle (or null) to the provided   * continuation.   *   * @param id the key to be queried   * @param handle The node on which the handle is requested   * @param command Command to be performed when the result is received   */  public void lookupHandle(Id id, NodeHandle handle, Continuation command) {    if (logger.level <= Logger.FINE) {      logger.log("Retrieving handle for id " + id + " from node " + handle);    }    sendRequest(handle, new FetchHandleMessage(getUID(), id, getLocalNodeHandle(), handle.getId()),      new NamedContinuation("FetchHandleMessage to " + handle + " for " + id, command));  }  /**   * Retrieves the object associated with a given content handle. Asynchronously   * returns a PastContent object as the result to the provided Continuation, or   * a PastException. The client must authenticate the object. In case of   * failure, an alternate replica can be obtained using a different handle   * obtained via lookupHandles().   *   * @param command Command to be performed when the result is received   * @param handle DESCRIBE THE PARAMETER   */  public void fetch(PastContentHandle handle, Continuation command) {    if (logger.level <= Logger.FINE) {      logger.log("Retrieving object associated with content handle " + handle);    }    if (logger.level <= Logger.FINER) {      logger.log("Fetching object under id " + handle.getId().toStringFull() + " on " + handle.getNodeHandle());    }    NodeHandle han = handle.getNodeHandle();    sendRequest(han, new FetchMessage(getUID(), handle, getLocalNodeHandle(), han.getId()),      new NamedContinuation("FetchMessage to " + handle.getNodeHandle() + " for " + handle.getId(), command));  }  // ----- COMMON API METHODS -----  /**   * This method is invoked on applications when the underlying node is about to   * forward the given message with the provided target to the specified next   * hop. Applications can change the contents of the message, specify a   * different nextHop (through re-routing), or completely terminate the   * message.   *   * @param message The message being sent, containing an internal message along   *      with a destination key and nodeHandle next hop.   * @return Whether or not to forward the message further   */  public boolean forward(final RouteMessage message) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -