⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pastimpl.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    Message internal;    try {      internal = message.getMessage(endpoint.getDeserializer());    } catch (IOException ioe) {      throw new RuntimeException(ioe);    }    if (internal instanceof LookupMessage) {      final LookupMessage lmsg = (LookupMessage) internal;      Id id = lmsg.getId();      // if it is a request, look in the cache      if (!lmsg.isResponse()) {        if (logger.level <= Logger.FINER) {          logger.log("Lookup message " + lmsg + " is a request; look in the cache");        }        if (storage.exists(id)) {          // deliver the message, which will do what we want          if (logger.level <= Logger.FINE) {            logger.log("Request for " + id + " satisfied locally - responding");          }          deliver(endpoint.getId(), lmsg);          return false;        }      }    } else if (internal instanceof LookupHandlesMessage) {      LookupHandlesMessage lmsg = (LookupHandlesMessage) internal;      if (!lmsg.isResponse()) {        if (endpoint.replicaSet(lmsg.getId(), lmsg.getMax()).size() == lmsg.getMax()) {          if (logger.level <= Logger.FINE) {            logger.log("Hijacking lookup handles request for " + lmsg.getId());          }          deliver(endpoint.getId(), lmsg);          return false;        }      }    }    return true;  }  /**   * This method is called on the application at the destination node for the   * given id.   *   * @param id The destination id of the message   * @param message The message being sent   */  public void deliver(Id id, Message message) {    final PastMessage msg = (PastMessage) message;    if (msg.isResponse()) {      handleResponse((PastMessage) message);    } else {      if (logger.level <= Logger.INFO) {        logger.log("Received message " + message + " with destination " + id);      }      if (msg instanceof InsertMessage) {        final InsertMessage imsg = (InsertMessage) msg;        // make sure the policy allows the insert        if (policy.allowInsert(imsg.getContent())) {          inserts++;          storage.getObject(imsg.getContent().getId(),            new StandardContinuation(getResponseContinuation(msg)) {              public void receiveResult(Object o) {                try {                  // allow the object to check the insert, and then insert the data                  PastContent content = imsg.getContent().checkInsert(imsg.getContent().getId(), (PastContent) o);                  storage.store(imsg.getContent().getId(), null, content, parent);                } catch (PastException e) {                  parent.receiveException(e);                }              }            });        } else {          getResponseContinuation(msg).receiveResult(new Boolean(false));        }      } else if (msg instanceof LookupMessage) {        final LookupMessage lmsg = (LookupMessage) msg;        lookups++;        // if the data is here, we send the reply, as well as push a cached copy        // back to the previous node        storage.getObject(lmsg.getId(),          new StandardContinuation(getResponseContinuation(lmsg)) {            public void receiveResult(Object o) {              if (logger.level <= Logger.FINE) {                logger.log("Received object " + o + " for id " + lmsg.getId());              }              // send result back              parent.receiveResult(o);              // if possible, pushed copy into previous hop cache              if ((lmsg.getPreviousNodeHandle() != null) &&                (o != null) &&                (!((PastContent) o).isMutable())) {                NodeHandle handle = lmsg.getPreviousNodeHandle();                if (logger.level <= Logger.FINE) {                  logger.log("Pushing cached copy of " + ((PastContent) o).getId() + " to " + handle);                }                CacheMessage cmsg = new CacheMessage(getUID(), (PastContent) o, getLocalNodeHandle(), handle.getId());                //endpoint.route(null, cmsg, handle);              }            }          });      } else if (msg instanceof LookupHandlesMessage) {        LookupHandlesMessage lmsg = (LookupHandlesMessage) msg;        NodeHandleSet set = endpoint.replicaSet(lmsg.getId(), lmsg.getMax());        if (logger.level <= Logger.FINER) {          logger.log("Returning replica set " + set + " for lookup handles of id " + lmsg.getId() + " max " + lmsg.getMax() + " at " + endpoint.getId());        }        getResponseContinuation(msg).receiveResult(set);      } else if (msg instanceof FetchMessage) {        FetchMessage fmsg = (FetchMessage) msg;        lookups++;        Continuation c;//        c = getResponseContinuation(msg);        c = getFetchResponseContinuation(msg);        // has to be special to determine how to send the message        storage.getObject(fmsg.getHandle().getId(), c);      } else if (msg instanceof FetchHandleMessage) {        final FetchHandleMessage fmsg = (FetchHandleMessage) msg;        fetchHandles++;        storage.getObject(fmsg.getId(),          new StandardContinuation(getResponseContinuation(msg)) {            public void receiveResult(Object o) {              PastContent content = (PastContent) o;              if (content != null) {                if (logger.level <= Logger.FINE) {                  logger.log("Retrieved data for fetch handles of id " + fmsg.getId());                }                parent.receiveResult(content.getHandle(PastImpl.this));              } else {                parent.receiveResult(null);              }            }          });      } else if (msg instanceof CacheMessage) {        cache(((CacheMessage) msg).getContent());      } else {        if (logger.level <= Logger.SEVERE) {          logger.log("ERROR - Received message " + msg + "of unknown type.");        }      }    }  }  /**   * This method is invoked to inform the application that the given node has   * either joined or left the neighbor set of the local node, as the set would   * be returned by the neighborSet call.   *   * @param handle The handle that has joined/left   * @param joined Whether the node has joined or left   */  public void update(NodeHandle handle, boolean joined) {  }  // ----- REPLICATION MANAGER METHODS -----  /**   * This upcall is invoked to tell the client to fetch the given id, and to   * call the given command with the boolean result once the fetch is completed.   * The client *MUST* call the command at some point in the future, as the   * manager waits for the command to return before continuing.   *   * @param id The id to fetch   * @param hint DESCRIBE THE PARAMETER   * @param command DESCRIBE THE PARAMETER   */  public void fetch(final Id id, NodeHandle hint, Continuation command) {    if (logger.level <= Logger.FINER) {      logger.log("Sending out replication fetch request for the id " + id);    }    policy.fetch(id, hint, backup, this,      new StandardContinuation(command) {        public void receiveResult(Object o) {          if (o == null) {            if (logger.level <= Logger.WARNING) {              logger.log("Could not fetch id " + id + " - policy returned null in namespace " + instance);            }            parent.receiveResult(new Boolean(false));          } else {            if (logger.level <= Logger.FINEST) {              logger.log("inserting replica of id " + id);            }            if (!(o instanceof PastContent)) {              if (logger.level <= Logger.WARNING) {                logger.log("ERROR! Not PastContent " + o.getClass().getName() + " " + o);              }            }            storage.getStorage().store(((PastContent) o).getId(), null, (PastContent) o, parent);          }        }      });  }  /**   * This upcall is to notify the client that the given id can be safely removed   * from the storage. The client may choose to perform advanced behavior, such   * as caching the object, or may simply delete it.   *   * @param id The id to remove   * @param command DESCRIBE THE PARAMETER   */  public void remove(final Id id, Continuation command) {    if (backup != null) {      storage.getObject(id,        new StandardContinuation(command) {          public void receiveResult(Object o) {            backup.cache(id, storage.getMetadata(id), (Serializable) o,              new StandardContinuation(parent) {                public void receiveResult(Object o) {                  storage.unstore(id, parent);                }              });          }        });    } else {      storage.unstore(id, command);    }  }  /**   * This upcall should return the set of keys that the application currently   * stores in this range. Should return a empty IdSet (not null), in the case   * that no keys belong to this range.   *   * @param range the requested range   * @return DESCRIBE THE RETURN VALUE   */  public IdSet scan(IdRange range) {    return storage.getStorage().scan(range);  }  /**   * This upcall should return the set of keys that the application currently   * stores. Should return a empty IdSet (not null), in the case that no keys   * belong to this range.   *   * @return DESCRIBE THE RETURN VALUE   */  public IdSet scan() {    return storage.getStorage().scan();  }  /**   * This upcall should return whether or not the given id is currently stored   * by the client.   *   * @param id The id in question   * @return Whether or not the id exists   */  public boolean exists(Id id) {    return storage.getStorage().exists(id);  }  /**   * DESCRIBE THE METHOD   *   * @param id DESCRIBE THE PARAMETER   * @param command DESCRIBE THE PARAMETER   */  public void existsInOverlay(Id id, Continuation command) {    lookupHandles(id, replicationFactor + 1,      new StandardContinuation(command) {        public void receiveResult(Object result) {          Object results[] = (Object[]) result;          for (int i = 0; i < results.length; i++) {            if (results[i] instanceof PastContentHandle) {              parent.receiveResult(Boolean.TRUE);              return;            }          }          parent.receiveResult(Boolean.FALSE);        }      });  }  /**   * DESCRIBE THE METHOD   *   * @param id DESCRIBE THE PARAMETER   * @param command DESCRIBE THE PARAMETER   */  public void reInsert(Id id, Continuation command) {    storage.getObject(id,      new StandardContinuation(command) {        public void receiveResult(final Object o) {          insert((PastContent) o,            new StandardContinuation(parent) {              public void receiveResult(Object result) {                Boolean results[] = (Boolean[]) result;                for (int i = 0; i < results.length; i++) {                  if (results[i].booleanValue()) {                    parent.receiveResult(Boolean.TRUE);                    return;                  }                }                parent.receiveResult(Boolean.FALSE);              }            });        }      });  }  /**   * DESCRIBE THE CLASS   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  protected class PastDeserializer implements MessageDeserializer {    /**     * DESCRIBE THE METHOD     *     * @param buf DESCRIBE THE PARAMETER     * @param type DESCRIBE THE PARAMETER     * @param priority DESCRIBE THE PARAMETER     * @param sender DESCRIBE THE PARAMETER     * @return DESCRIBE THE RETURN VALUE     * @exception IOException DESCRIBE THE EXCEPTION     */    public Message deserialize(InputBuffer buf, short type, byte priority,                               NodeHandle sender) throws IOException {      try {        switch (type) {          case CacheMessage.TYPE:            return CacheMessage.build(buf, endpoint, contentDeserializer);          case FetchHandleMessage.TYPE:            return FetchHandleMessage.build(buf, endpoint, contentHandleDeserializer);          case FetchMessage.TYPE:            return FetchMessage.build(buf, endpoint, contentDeserializer, contentHandleDeserializer);          case InsertMessage.TYPE:            return InsertMessage.build(buf, endpoint, contentDeserializer);          case LookupHandlesMessage.TYPE:            return LookupHandlesMessage.build(buf, endpoint);          case LookupMessage.TYPE:            return LookupMessage.build(buf, endpoint, contentDeserializer);        }      } catch (IOException e) {        if (logger.level <= Logger.SEVERE) {          logger.log("Exception in deserializer in " + PastImpl.this.endpoint.toString() + ":" + instance + " " + e);        }        throw e;      }      throw new IllegalArgumentException("Unknown type:" + type + " in " + PastImpl.this.toString());    }  }  /**   * Class which builds a message   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  public interface MessageBuilder {    /**     * DESCRIBE THE METHOD     *     * @return DESCRIBE THE RETURN VALUE     */    public PastMessage buildMessage();  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -