📄 replicationmanagerimpl.java
字号:
/** * This method is invoked to inform the application that the given node has * either joined or left the neighbor set of the local node, as the set would * be returned by the neighborSet call. * * @param handle The handle that has joined/left * @param joined Whether the node has joined or left */ public void update(NodeHandle handle, boolean joined) { } /** * Inner class which keeps track of the state we're in- waiting, sleeping, or * with nothing to do. * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ protected class ReplicationManagerHelper { /** * The set of possible states we can be in */ public int STATE_NOTHING = 0; /** * DESCRIBE THE FIELD */ public int STATE_WAITING = 1; /** * DESCRIBE THE FIELD */ public int STATE_SLEEPING = 2; /** * The current state that we are in */ protected int state; /** * The set of keys we have yet to fetch */ protected IdSet set; /** * The next message UID which is available */ protected Id current; /** * A cache of hints, mapping Id -> NodeHandle */ protected HashMap hints; /** * Constructor */ public ReplicationManagerHelper() { set = factory.buildIdSet(); hints = new HashMap(); state = STATE_NOTHING; } /** * Interal method which safely takes the next id to be fetched from the set * of pending keys * * @return The next key to be fetched */ protected synchronized Id getNextId() { if (set.numElements() == 0) { if (logger.level <= Logger.WARNING) { logger.log("GetNextId called without any ids available - aborting"); } return null; } current = (Id) set.getIterator().next(); set.removeId(current); if (logger.level <= Logger.FINER) { logger.log("Returing next id to fetch " + current); } if (!client.exists(current)) { return current; } else { return getNextId(); } } /** * Method by which the range is set, which will delete any keys from the to * fetch list not in the range * * @param range The new range */ public synchronized void setRange(IdRange range) { IdRange notRange = range.getComplementRange(); /* * first, we remove any non-relevant keys from the list of pending keys */ Iterator i = set.subSet(notRange).getIterator(); /* * now look for any matching ids */ while (i.hasNext()) { Id id = (Id) i.next(); set.removeId(id); hints.remove(id); } } /** * Method by which keys are added to the list of keys to fetch * * @param keySet The keys to add * @param hint DESCRIBE THE PARAMETER */ public synchronized void fetch(IdSet keySet, NodeHandle hint) { Iterator i = keySet.getIterator(); while (i.hasNext()) { Id id = (Id) i.next(); if (!(set.isMemberId(id) || client.exists(id) || ((current != null) && (id.equals(current))))) { set.addId(id); hints.put(id, hint); } } if ((state == STATE_NOTHING) && (set.numElements() > 0)) { send(); } } /** * In this case, it returns the list of keys the client has, along with the * keys which we have yet to tell the client to fetch. * * @param range the requested range * @return DESCRIBE THE RETURN VALUE */ public IdSet scan(IdRange range) { return set.subSet(range); } /** * Method which determines if a message should be sent, and if so, sends it */ protected synchronized void send() { if ((state != STATE_WAITING) && (set.numElements() > 0)) { Id id = getNextId(); NodeHandle hint = (NodeHandle) hints.remove(id); if (id != null) { state = STATE_WAITING; informClient(id, hint); } else { state = STATE_NOTHING; } } else if (state != STATE_WAITING) { state = STATE_NOTHING; } } /** * DESCRIBE THE METHOD */ public synchronized void wakeup() { if (state == STATE_SLEEPING) { send(); } } /** * DESCRIBE THE METHOD * * @param id DESCRIBE THE PARAMETER */ public synchronized void message(Id id) { if ((state == STATE_WAITING) && (current != null) && (current.equals(id))) { state = STATE_SLEEPING; current = null; scheduleNext(); } } } /** * Inner class which keeps track of the keys which we are currently deleting * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ protected class ReplicationManagerDeleter implements Continuation { /** * The set of ids we are responsible for deleting */ protected IdSet set; /** * Whether or not we are waiting for a response */ protected Id id; /** * Bulds a new one */ public ReplicationManagerDeleter() { set = factory.buildIdSet(); } /** * Adds a set of ids to the to-delete list * * @param range The current responsible range */ public synchronized void setRange(IdRange range) { IdRange notRange = range.getComplementRange(); // first, we add all of the clients stuff in the not-range Iterator i = client.scan(notRange).getIterator(); int count = 0; while (i.hasNext() && (count < NUM_DELETE_AT_ONCE)) { count++; Id next = (Id) i.next(); if ((id == null) || (!(id.equals(next)))) { set.addId(next); } } // next, we remove and ids from the to-delete list which are not in the range Iterator j = set.subSet(range).getIterator(); while (j.hasNext()) { set.removeId((Id) j.next()); } go(); } /** * Internal method which starts the deleting, if it's not already started */ protected synchronized void go() { if ((id == null) && (set.numElements() > 0)) { id = (Id) set.getIterator().next(); set.removeId(id); if (logger.level <= Logger.FINER) { logger.log("Deciding whether to remove " + id); } client.existsInOverlay(id, new StandardContinuation(this) { public void receiveResult(Object result) { if (Boolean.TRUE.equals(result)) { if (logger.level <= Logger.FINER) { logger.log("Telling client to delete id " + id); } if (logger.level <= Logger.FINER) { logger.log("RMImpl.go " + instance + ": removing id " + id); } client.remove(id, parent); } else { if (logger.level <= Logger.FINER) { logger.log("Object to remove " + id + " not found. Reinserting."); } client.reInsert(id, new StandardContinuation(parent) { public void receiveResult(Object result) { if (Boolean.TRUE.equals(result)) { if (logger.level <= Logger.FINER) { logger.log("Telling client to delete id " + id); } if (logger.level <= Logger.FINER) { logger.log("RMImpl.go " + instance + ": removing id " + id); } client.remove(id, parent); } else { if (logger.level <= Logger.FINER) { logger.log("Object to remove " + id + " Could not be reinserted. Ignoring remove."); } receiveResult(Boolean.FALSE); } } }); } } }); } } /** * Implementation of continuation * * @param o The result */ public synchronized void receiveResult(Object o) { if (id == null) { if (logger.level <= Logger.SEVERE) { logger.log("ERROR: RMImpl.deleter Received result " + o + " unexpectedly!"); } } if (!Boolean.TRUE.equals(o)) { if (logger.level <= Logger.SEVERE) { logger.log("ERROR: RMImpl.deleter Unstore of " + id + " did not succeed '" + o + "'!"); } } id = null; go(); } /** * Implementation of continuation * * @param e DESCRIBE THE PARAMETER */ public synchronized void receiveException(Exception e) { if (logger.level <= Logger.SEVERE) { logger.logException("RMImpl.deleter Unstore of " + id + " caused exception '" + e + "'!", e); } id = null; go(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -