📄 pastimpl.java
字号:
/*************************************************************************"FreePastry" Peer-to-Peer Application Development Substrate Copyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither the name of Rice University (RICE) nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************/package rice.p2p.past;import java.io.*;import java.nio.ByteBuffer;import java.util.*;import java.util.logging.*;import rice.*;import rice.Continuation.*;import rice.environment.Environment;import rice.environment.logging.Logger;import rice.environment.params.Parameters;import rice.p2p.commonapi.*;import rice.p2p.commonapi.appsocket.*;import rice.p2p.commonapi.rawserialization.*;import rice.p2p.past.PastPolicy.*;import rice.p2p.past.messaging.*;import rice.p2p.past.rawserialization.*;import rice.p2p.replication.*;import rice.p2p.replication.manager.*;import rice.p2p.util.MathUtils;import rice.p2p.util.rawserialization.*;import rice.persistence.*;/** * @(#) PastImpl.java This is an implementation of the Past interface. * * @version $Id: PastImpl.java 3274 2006-05-15 16:17:47Z jeffh $ * @author Alan Mislove * @author Ansley Post * @author Peter Druschel */public class PastImpl implements Past, Application, ReplicationManagerClient { // ----- STATIC FIELDS ----- // the number of milliseconds to wait before declaring a message lost /** * DESCRIBE THE FIELD */ public final int MESSAGE_TIMEOUT; // = 30000; // the percentage of successful replica inserts in order to declare success /** * DESCRIBE THE FIELD */ public final double SUCCESSFUL_INSERT_THRESHOLD; // = 0.5; // ----- VARIABLE FIELDS ----- // this application's endpoint /** * DESCRIBE THE FIELD */ protected Endpoint endpoint; // the storage manager used by this Past /** * DESCRIBE THE FIELD */ protected StorageManager storage; // The trash can, or where objects should go once removed. If null, they are deleted /** * DESCRIBE THE FIELD */ protected StorageManager trash; // The backup store, or location of over-replicated objects, helping PAST to better deal with churn /** * DESCRIBE THE FIELD */ protected Cache backup; // the replication factor for Past /** * DESCRIBE THE FIELD */ protected int replicationFactor; // the replica manager used by Past /** * DESCRIBE THE FIELD */ protected ReplicationManager replicaManager; // the policy used for application-specific behavior /** * DESCRIBE THE FIELD */ protected PastPolicy policy; // the unique ids used by the messages sent across the wire private int id; // the hashtable of outstanding messages private Hashtable outstanding; // the hashtable of outstanding timer tasks private Hashtable timers; // the factory for manipulating ids /** * DESCRIBE THE FIELD */ protected IdFactory factory; // the instance name we are running with /** * DESCRIBE THE FIELD */ protected String instance; // debug variables /** * DESCRIBE THE FIELD */ public int inserts = 0; /** * DESCRIBE THE FIELD */ public int lookups = 0; /** * DESCRIBE THE FIELD */ public int fetchHandles = 0; /** * DESCRIBE THE FIELD */ public int other = 0; /** * DESCRIBE THE FIELD */ protected Environment environment; /** * DESCRIBE THE FIELD */ protected Logger logger; /** * DESCRIBE THE FIELD */ protected PastContentDeserializer contentDeserializer; /** * DESCRIBE THE FIELD */ protected PastContentHandleDeserializer contentHandleDeserializer; /** * DESCRIBE THE FIELD */ public SocketStrategy socketStrategy; /** * AppSocket -> ByteBuffer[] Used for receiving the objects. */ WeakHashMap pendingSocketTransactions = new WeakHashMap(); /** * Constructor for Past, using the default policy * * @param node The node below this Past implementation * @param manager The storage manager to be used by Past * @param replicas The number of object replicas * @param instance The unique instance name of this Past */ public PastImpl(Node node, StorageManager manager, int replicas, String instance) { this(node, manager, replicas, instance, new DefaultPastPolicy()); } /** * Constructor for Past * * @param node The node below this Past implementation * @param manager The storage manager to be used by Past * @param replicas The number of object replicas * @param instance The unique instance name of this Past * @param policy DESCRIBE THE PARAMETER */ public PastImpl(Node node, StorageManager manager, int replicas, String instance, PastPolicy policy) { this(node, manager, null, replicas, instance, policy, null); } /** * Constructor for PastImpl. * * @param node DESCRIBE THE PARAMETER * @param manager DESCRIBE THE PARAMETER * @param backup DESCRIBE THE PARAMETER * @param replicas DESCRIBE THE PARAMETER * @param instance DESCRIBE THE PARAMETER * @param policy DESCRIBE THE PARAMETER * @param trash DESCRIBE THE PARAMETER */ public PastImpl(Node node, StorageManager manager, Cache backup, int replicas, String instance, PastPolicy policy, StorageManager trash) { this(node, manager, backup, replicas, instance, policy, trash, false); } /** * @param node * @param manager * @param backup * @param replicas * @param instance * @param policy * @param trash * @param useOwnSocket send all inserts/fetches over a socket (default is * false) */ public PastImpl(Node node, StorageManager manager, Cache backup, int replicas, String instance, PastPolicy policy, StorageManager trash, boolean useOwnSocket) { this(node, manager, backup, replicas, instance, policy, trash, new DefaultSocketStrategy(useOwnSocket)); } /** * Constructor for Past * * @param node The node below this Past implementation * @param manager The storage manager to be used by Past * @param replicas The number of object replicas * @param instance The unique instance name of this Past * @param backup DESCRIBE THE PARAMETER * @param policy DESCRIBE THE PARAMETER * @param trash DESCRIBE THE PARAMETER * @param strategy DESCRIBE THE PARAMETER */ public PastImpl(Node node, StorageManager manager, Cache backup, int replicas, String instance, PastPolicy policy, StorageManager trash, SocketStrategy strategy) { this.environment = node.getEnvironment(); logger = environment.getLogManager().getLogger(getClass(), instance); Parameters p = environment.getParameters(); MESSAGE_TIMEOUT = p.getInt("p2p_past_messageTimeout"); // = 30000; SUCCESSFUL_INSERT_THRESHOLD = p.getDouble("p2p_past_successfulInsertThreshold"); // = 0.5; this.socketStrategy = strategy; this.storage = manager; this.backup = backup; this.contentDeserializer = new JavaPastContentDeserializer(); this.contentHandleDeserializer = new JavaPastContentHandleDeserializer(); this.endpoint = node.buildEndpoint(this, instance); this.endpoint.setDeserializer(new PastDeserializer()); this.factory = node.getIdFactory(); this.policy = policy; this.instance = instance; this.trash = trash; this.id = Integer.MIN_VALUE; this.outstanding = new Hashtable(); this.timers = new Hashtable(); this.replicationFactor = replicas; // log.addHandler(new ConsoleHandler()); // log.setLevel(Level.FINE); // log.getHandlers()[0].setLevel(Level.FINE); this.replicaManager = buildReplicationManager(node, instance); this.endpoint.accept( new AppSocketReceiver() { public void receiveSocket(AppSocket socket) { if (logger.level <= Logger.FINE) { logger.log("Received Socket from " + socket); } socket.register(true, false, 10000, this); endpoint.accept(this); } public void receiveSelectResult(AppSocket socket, boolean canRead, boolean canWrite) { if (logger.level <= Logger.FINER) { logger.log("Reading from " + socket); } try { ByteBuffer[] bb = (ByteBuffer[]) pendingSocketTransactions.get(socket); if (bb == null) { // this is a new message // read the size bb = new ByteBuffer[1]; bb[0] = ByteBuffer.allocate(4); if (socket.read(bb, 0, 1) == -1) { close(socket); return; } // TODO: need to handle the condition where we don't read the whole size... byte[] sizeArr = bb[0].array(); int size = MathUtils.byteArrayToInt(sizeArr); if (logger.level <= Logger.FINER) { logger.log("Found object of size " + size + " from " + socket); } // allocate a buffer to store the object, save it in the pst bb[0] = ByteBuffer.allocate(size); pendingSocketTransactions.put(socket, bb); } // now we have a bb // read some bytes if (socket.read(bb, 0, 1) == -1) { close(socket); } // deserialize or reregister if (bb[0].remaining() == 0) { // make sure to clear things up so we can keep receiving pendingSocketTransactions.remove(socket); if (logger.level <= Logger.FINEST) { logger.log("bb[0].limit() " + bb[0].limit() + " bb[0].remaining() " + bb[0].remaining() + " from " + socket); } // deserialize the object SimpleInputBuffer sib = new SimpleInputBuffer(bb[0].array()); short type = sib.readShort(); PastMessage result = (PastMessage) endpoint.getDeserializer().deserialize(sib, type, (byte) 0, null); deliver(null, result); } // there will be more data on the socket if we haven't received everything yet // need to register either way to be able to read from the sockets when they are closed remotely, could alternatively close early // cause we are currently only sending 1 message per socket, but it's better to just keep reading in case we one day reuse sockets socket.register(true, false, 10000, this); // recursive call to handle next object // cant do this becasue calling read when not ready throws an exception// receiveSelectResult(socket, canRead, canWrite); } catch (IOException ioe) { receiveException(socket, ioe); } } public void receiveException(AppSocket socket, Exception e) { if (logger.level <= Logger.WARNING) { logger.logException("Error receiving message", e); } close(socket); } public void close(AppSocket socket) { if (socket == null) { return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -