⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gcpastimpl.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                  logger.log("REFRESH: ON PROCESSING THREAD!");                }                for (int i = 0; i < array.length; i++) {                  GCId id = (GCId) array[i];                  NodeHandleSet replicas = endpoint.replicaSet(id.getId(), replicationFactor + 1, set.getHandle(set.size() - 1), set);                  // if we have all of the replicas, go ahead and refresh this item                  if ((replicas != null) && ((replicas.size() == set.size()) || (replicas.size() == replicationFactor + 1))) {                    for (int j = 0; j < replicas.size(); j++) {                      map.addReplica(replicas.getHandle(j), id);                    }                    refreshed++;                    ids.removeId(id);                  }                }                if (logger.level <= Logger.FINE) {                  logger.log("REFRESH: DONE WITH PROCESSING THREAD - MOVING TO NORMAL THREAD!");                }                return null;              }            },            new StandardContinuation(parent) {              public void receiveResult(Object o) {                if (logger.level <= Logger.FINE) {                  logger.log("REFRESH: BACK ON NORMAL THREAD!");                }                final Iterator iterator = map.getReplicas();                Continuation send =                  new StandardContinuation(parent) {                    public void receiveResult(Object o) {                      if (iterator.hasNext()) {                        NodeHandle next = (NodeHandle) iterator.next();                        GCIdSet ids = map.getIds(next);                        if (logger.level <= Logger.FINE) {                          logger.log("REFRESH: SENDING REQUEST TO " + next + " FOR IDSET " + ids);                        }                        sendRequest(next, new GCRefreshMessage(getUID(), ids, getLocalNodeHandle(), next.getId()),                          new NamedContinuation("GCRefresh to " + next, this));                      } else {                        if (logger.level <= Logger.FINE) {                          logger.log("REFRESH: DONE SENDING REQUESTS, RECURSING");                        }                        refresh(ids, parent);                      }                    }                    public void receiveException(Exception e) {                      if (logger.level <= Logger.FINE) {                        logger.log("GOT EXCEPTION " + e + " REFRESHING ITEMS - CONTINUING");                      }                      receiveResult(null);                    }                  };                send.receiveResult(null);              }            });        }      });  }  /**   * This method is invoked on applications when the underlying node is about to   * forward the given message with the provided target to the specified next   * hop. Applications can change the contents of the message, specify a   * different nextHop (through re-routing), or completely terminate the   * message.   *   * @param message The message being sent, containing an internal message along   *      with a destination key and nodeHandle next hop.   * @return Whether or not to forward the message further   */  public boolean forward(final RouteMessage message) {    try {      if (message.getMessage(endpoint.getDeserializer()) instanceof GCLookupHandlesMessage) {        return true;      } else {        return super.forward(message);      }    } catch (IOException ioe) {      throw new RuntimeException(ioe);    }  }  /**   * This method is called on the application at the destination node for the   * given id.   *   * @param id The destination id of the message   * @param message The message being sent   */  public void deliver(Id id, Message message) {    final PastMessage msg = (PastMessage) message;    if (msg.isResponse()) {      super.deliver(id, message);    } else {      if (msg instanceof GCInsertMessage) {        final GCInsertMessage imsg = (GCInsertMessage) msg;        inserts++;        // make sure the policy allows the insert        if (policy.allowInsert(imsg.getContent())) {          Id theId = imsg.getContent().getId();          if (theId == null) {            if (logger.level <= Logger.SEVERE) {              logger.log("Error: null Id from " + imsg.getContent() + " from " + imsg + " in " + this);            }          }          storage.getObject(theId,            new StandardContinuation(getResponseContinuation(msg)) {              public void receiveResult(Object o) {                try {                  // allow the object to check the insert, and then insert the data                  GCPastContent content = (GCPastContent) imsg.getContent().checkInsert(imsg.getContent().getId(), (PastContent) o);                  storage.store(content.getId(), content.getMetadata(imsg.getExpiration()), content, parent);                } catch (PastException e) {                  parent.receiveException(e);                }              }            });        } else {          getResponseContinuation(msg).receiveResult(new Boolean(false));        }      } else if (msg instanceof GCRefreshMessage) {        final GCRefreshMessage rmsg = (GCRefreshMessage) msg;        final Iterator i = Arrays.asList(rmsg.getKeys()).iterator();        final Vector result = new Vector();        other += rmsg.getKeys().length;        StandardContinuation process =          new StandardContinuation(getResponseContinuation(msg)) {            public void receiveResult(Object o) {              if (o != null) {                result.addElement(o);              }              if (i.hasNext()) {                final GCId id = (GCId) i.next();                /*                 *  skip the object if we don't have it yet                 */                if (storage.exists(id.getId())) {                  GCPastMetadata metadata = (GCPastMetadata) storage.getMetadata(id.getId());                  if (metadata != null) {                    /*                     *  only allow the lifetime to be extended, otherwise skip                     */                    if (metadata.getExpiration() < id.getExpiration()) {                      storage.setMetadata(id.getId(), metadata.setExpiration(id.getExpiration()), this);                    } else {                      receiveResult(Boolean.FALSE);                    }                  } else {                    storage.getObject(id.getId(),                      new StandardContinuation(this) {                        public void receiveResult(Object o) {                          storage.setMetadata(id.getId(), ((GCPastContent) o).getMetadata(id.getExpiration()), parent);                        }                      });                  }                } else {                  /*                   *  but first check and see if it's in the trash, so we can uncollect it                   */                  if (trash != null) {                    trash.getObject(id.getId(),                      new StandardContinuation(this) {                        public void receiveResult(Object o) {                          if ((o != null) && (o instanceof GCPastContent)) {                            if (logger.level <= Logger.FINE) {                              logger.log(                                "GCREFRESH: Restoring object " + id + " from trash!");                            }                            GCPastContent content = (GCPastContent) o;                            storage.store(id.getId(), content.getMetadata(id.getExpiration()), content,                              new StandardContinuation(parent) {                                public void receiveResult(Object o) {                                  trash.unstore(id.getId(), parent);                                }                              });                          } else {                            parent.receiveResult(Boolean.FALSE);                          }                        }                      });                  } else {                    receiveResult(Boolean.FALSE);                  }                }              } else {                parent.receiveResult(result.toArray(new Boolean[0]));              }            }          };        process.receiveResult(null);      } else if (msg instanceof GCLookupHandlesMessage) {        GCLookupHandlesMessage lmsg = (GCLookupHandlesMessage) msg;        NodeHandleSet set = endpoint.neighborSet(lmsg.getMax());        set.removeHandle(getLocalNodeHandle().getId());        set.putHandle(getLocalNodeHandle());        if (logger.level <= Logger.FINER) {          logger.log("Returning neighbor set " + set + " for lookup handles of id " + lmsg.getId() + " max " + lmsg.getMax() + " at " + endpoint.getId());        }        getResponseContinuation(msg).receiveResult(set);      } else if (msg instanceof GCCollectMessage) {        // get all ids which expiration before now        collect(storage.scanMetadataValuesHead(new GCPastMetadata(environment.getTimeSource().currentTimeMillis())),          new ListenerContinuation("Removal of expired ids", environment) {            public void receiveResult(Object o) {              if (environment.getTimeSource().currentTimeMillis() > DEFAULT_EXPIRATION) {                collect(storage.scanMetadataValuesNull(), new ListenerContinuation("Removal of default expired ids", environment));              }            }          });      } else if (msg instanceof FetchHandleMessage) {        final FetchHandleMessage fmsg = (FetchHandleMessage) msg;        fetchHandles++;        storage.getObject(fmsg.getId(),          new StandardContinuation(getResponseContinuation(msg)) {            public void receiveResult(Object o) {              GCPastContent content = (GCPastContent) o;              if (content != null) {                if (logger.level <= Logger.FINE) {                  logger.log("Retrieved data for fetch handles of id " + fmsg.getId());                }                GCPastMetadata metadata = (GCPastMetadata) storage.getMetadata(fmsg.getId());                if (metadata != null) {                  parent.receiveResult(content.getHandle(GCPastImpl.this, metadata.getExpiration()));                } else {                  parent.receiveResult(content.getHandle(GCPastImpl.this, DEFAULT_EXPIRATION));                }              } else {                parent.receiveResult(null);              }            }          });      } else {        super.deliver(id, message);      }    }  }  /**   * Internal method which collects all of the objects in the given set   *   * @param command The command to call once done   * @param map DESCRIBE THE PARAMETER   */  protected void collect(SortedMap map, Continuation command) {    final Iterator i = map.keySet().iterator();    Continuation remove =      new StandardContinuation(command) {        public void receiveResult(Object o) {          if (i.hasNext()) {            final Id gid = (Id) i.next();            GCPastMetadata metadata = (GCPastMetadata) storage.getMetadata(gid);            collected++;            if (trash != null) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -