📄 replicationmanagerimpl.java
字号:
/*************************************************************************"FreePastry" Peer-to-Peer Application Development Substrate Copyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither the name of Rice University (RICE) nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************/package rice.p2p.replication.manager;import java.util.*;import java.util.logging.*;import rice.*;import rice.Continuation.*;import rice.environment.Environment;import rice.environment.logging.Logger;import rice.environment.params.Parameters;import rice.p2p.commonapi.*;import rice.p2p.replication.*;import rice.p2p.replication.manager.messaging.*;/** * @(#) ReplicationManagerImpl.java This class is the default provided * implementation of the replication manager used. * * @version $Id: ReplicationManagerImpl.java 3274 2006-05-15 16:17:47Z jeffh $ * @author Alan Mislove */public class ReplicationManagerImpl implements ReplicationManager, ReplicationClient, Application { /** * The amount of time to wait between fetch calls to the client */ public final int FETCH_DELAY; /** * The amount of time to wait before giving up on a client fetch */ public final int TIMEOUT_DELAY; /** * The number of ids to delete at a given time - others will be deleted later */ public final int NUM_DELETE_AT_ONCE; /** * The id factory used for manipulating ids */ protected IdFactory factory; /** * The endpoint used for sending reminder messages */ protected Endpoint endpoint; /** * The replication used by the manager */ protected ReplicationImpl replication; /** * The client of this manager */ protected ReplicationManagerClient client; /** * The helper for the replication manager */ protected ReplicationManagerHelper helper; /** * The deleter, for managing ids to delete */ protected ReplicationManagerDeleter deleter; /** * DESCRIBE THE FIELD */ protected String instance; /** * DESCRIBE THE FIELD */ protected Environment environment; /** * DESCRIBE THE FIELD */ protected Logger logger; /** * Constructor * * @param node The node below this Replication implementation * @param client The client for this Replication * @param replicationFactor The replication factor for this instance * @param instance The unique instance name of this Replication */ public ReplicationManagerImpl(Node node, ReplicationManagerClient client, int replicationFactor, String instance) { this(node, client, replicationFactor, instance, null); } /** * Constructor * * @param node The node below this Replication implementation * @param client The client for this Replication * @param replicationFactor The replication factor for this instance * @param instance The unique instance name of this Replication * @param policy The replication policy to use */ public ReplicationManagerImpl(Node node, ReplicationManagerClient client, int replicationFactor, String instance, ReplicationPolicy policy) { this.environment = node.getEnvironment(); logger = environment.getLogManager().getLogger(ReplicationManagerImpl.class, instance); Parameters p = environment.getParameters(); FETCH_DELAY = p.getInt("p2p_replication_manager_fetch_delay"); TIMEOUT_DELAY = p.getInt("p2p_replication_manager_timeout_delay"); NUM_DELETE_AT_ONCE = p.getInt("p2p_replication_manager_num_delete_at_once"); this.client = client; this.factory = node.getIdFactory(); this.endpoint = node.buildEndpoint(this, instance); this.helper = new ReplicationManagerHelper(); this.deleter = new ReplicationManagerDeleter(); this.instance = instance; if (logger.level <= Logger.FINE) { logger.log("Starting up ReplicationManagerImpl with client " + client); } this.replication = new ReplicationImpl(node, this, replicationFactor, instance, policy); endpoint.register(); } // ----- UTILITY METHODS ----- /** * Utility method which returns the underlying replication object. Should only * be used for testing - messing with this causes undefined behavior. * * @return The underlying replication object */ public Replication getReplication() { return replication; } /** * This upcall is to notify the application of the range of keys for which it * is responsible. The application might choose to react to call by calling a * scan(complement of this range) to the persistance manager and get the keys * for which it is not responsible and call delete on the persistance manager * for those objects. * * @param range the range of keys for which the local node is currently * responsible */ public void setRange(final IdRange range) { if (logger.level <= Logger.FINEST) { logger.log("Removing range " + range + " from the list of pending ids"); } helper.setRange(range); deleter.setRange(range); } // ----- INTERNAL METHODS ----- /** * Internal method which clones an IdSet, so that iterators work as expected * * @param keySet The set to clone * @return The cloned set */ protected IdSet clone(IdSet keySet) { IdSet result = factory.buildIdSet(); Iterator i = keySet.getIterator(); while (i.hasNext()) { result.addId((Id) i.next()); } return result; } /** * Internal method which informs the client of the next id to fetch * * @param id The id which the client should fetch * @param hint The hint where the id may be */ protected void informClient(final Id id, NodeHandle hint) { if (logger.level <= Logger.FINE) { logger.log("Telling client to fetch id " + id); } final CancellableTask timer = endpoint.scheduleMessage(new TimeoutMessage(id), TIMEOUT_DELAY); client.fetch(id, hint, new Continuation() { public void receiveResult(Object o) { if (!(new Boolean(true)).equals(o)) { if (o instanceof Throwable) { if (logger.level <= Logger.WARNING) { logger.logException("Fetching of id " + id + " failed with ", (Throwable) o); } } else { if (logger.level <= Logger.WARNING) { logger.log("Fetching of id " + id + " failed with " + o); } } } if (logger.level <= Logger.FINE) { logger.log("Successfully fetched id " + id); } timer.cancel(); helper.message(id); } public void receiveException(Exception e) { receiveResult(e); } }); } /** * Internal method which schedules the next reminder message (if it is * necessary), or simply resets the active flag if there's nothing to be * fetched. */ protected void scheduleNext() { if (logger.level <= Logger.FINER) { logger.log("Scheduling next fetch in " + FETCH_DELAY + " milliseconds"); } endpoint.scheduleMessage(new ReminderMessage(), FETCH_DELAY); } // ----- REPLICATION METHODS ----- /** * This upcall is invoked to notify the application that is should fetch the * cooresponding keys in this set, since the node is now responsible for these * keys also. * * @param keySet set containing the keys that needs to be fetched * @param hint DESCRIBE THE PARAMETER */ public void fetch(IdSet keySet, NodeHandle hint) { // log.finer(endpoint.getId() + ": Adding keyset " + keySet + " to the list of pending ids"); helper.fetch(keySet, hint); } /** * This upcall should return the set of keys that the application currently * stores in this range. Should return a empty IdSet (not null), in the case * that no keys belong to this range. In this case, it returns the list of * keys the client has, along with the keys which we have yet to tell the * client to fetch. * * @param range the requested range * @return DESCRIBE THE RETURN VALUE */ public IdSet scan(IdRange range) { return client.scan(range); } // ----- COMMONAPI METHODS ----- /** * This method is invoked on applications when the underlying node is about to * forward the given message with the provided target to the specified next * hop. Applications can change the contents of the message, specify a * different nextHop (through re-routing), or completely terminate the * message. * * @param message The message being sent, containing an internal message along * with a destination key and nodeHandle next hop. * @return Whether or not to forward the message further */ public boolean forward(RouteMessage message) { return true; } /** * This method is called on the application at the destination node for the * given id. * * @param id The destination id of the message * @param message The message being sent */ public void deliver(Id id, Message message) { if (message instanceof ReminderMessage) { if (logger.level <= Logger.FINEST) { logger.log("Received reminder message"); } helper.wakeup(); } else if (message instanceof TimeoutMessage) { if (logger.level <= Logger.FINEST) { logger.log("Received timeout message"); } helper.message(((TimeoutMessage) message).getId()); } else { if (logger.level <= Logger.WARNING) { logger.log("Received unknown message " + message); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -