📄 scribeimpl.java
字号:
/*************************************************************************"FreePastry" Peer-to-Peer Application Development Substrate Copyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither the name of Rice University (RICE) nor the names of itscontributors may be used to endorse or promote products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************/package rice.p2p.scribe;import java.io.IOException;import java.util.*;import java.util.prefs.*;import rice.*;import rice.environment.Environment;import rice.environment.logging.Logger;import rice.environment.params.Parameters;import rice.p2p.commonapi.*;import rice.p2p.commonapi.rawserialization.*;import rice.p2p.scribe.messaging.*;import rice.p2p.scribe.rawserialization.*;/** * @(#) ScribeImpl.java Thie provided implementation of Scribe. * * @version $Id: ScribeImpl.java 3274 2006-05-15 16:17:47Z jeffh $ * @author Alan Mislove */public class ScribeImpl implements Scribe, Application { /** * The interval with which to perform maintenance */ public final int MAINTENANCE_INTERVAL; /** * the timeout for a subscribe message */ public final int MESSAGE_TIMEOUT; /** * the hashtable of topic -> TopicManager */ public Hashtable topics; /** * this scribe's policy */ protected ScribePolicy policy; /** * this application's endpoint */ protected Endpoint endpoint; /** * the local node handle */ protected NodeHandle handle; /** * the hashtable of outstanding messages */ private Hashtable outstanding; /** * The hashtable of outstanding lost messags */ private Hashtable lost; /** * the next unique id */ private int id; Environment environment; Logger logger; private String instance; ScribeContentDeserializer contentDeserializer; /** * Constructor for Scribe, using the default policy. * * @param node The node below this Scribe implementation * @param instance The unique instance name of this Scribe */ public ScribeImpl(Node node, String instance) { this(node, new ScribePolicy.DefaultScribePolicy(node.getEnvironment()), instance); } /** * Constructor for Scribe * * @param node The node below this Scribe implementation * @param policy The policy for this Scribe * @param instance The unique instance name of this Scribe */ public ScribeImpl(Node node, ScribePolicy policy, String instance) { this.environment = node.getEnvironment(); logger = environment.getLogManager().getLogger(ScribeImpl.class, instance); Parameters p = environment.getParameters(); MAINTENANCE_INTERVAL = p.getInt("p2p_scribe_maintenance_interval"); MESSAGE_TIMEOUT = p.getInt("p2p_scribe_message_timeout"); this.instance = instance; this.endpoint = node.buildEndpoint(this, instance); this.contentDeserializer = new JavaScribeContentDeserializer(); this.endpoint.setDeserializer( new MessageDeserializer() { public Message deserialize(InputBuffer buf, short type, byte priority, NodeHandle sender) throws IOException { try { switch (type) { case AnycastMessage.TYPE: return AnycastMessage.build(buf, endpoint, contentDeserializer); case SubscribeMessage.TYPE: return SubscribeMessage.buildSM(buf, endpoint, contentDeserializer); case SubscribeAckMessage.TYPE: return SubscribeAckMessage.build(buf, endpoint); case SubscribeFailedMessage.TYPE: return SubscribeFailedMessage.build(buf, endpoint); case DropMessage.TYPE: return DropMessage.build(buf, endpoint); case PublishMessage.TYPE: return PublishMessage.build(buf, endpoint, contentDeserializer); case PublishRequestMessage.TYPE: return PublishRequestMessage.build(buf, endpoint, contentDeserializer); case UnsubscribeMessage.TYPE: return UnsubscribeMessage.build(buf, endpoint); } } catch (IOException e) { if (logger.level <= Logger.SEVERE) { logger.log("Exception in deserializer in " + ScribeImpl.this.endpoint.toString() + ":" + ScribeImpl.this.instance + " " + contentDeserializer + " " + e); } throw e; } throw new IllegalArgumentException("Unknown type:" + type); } }); this.topics = new Hashtable(); this.outstanding = new Hashtable(); this.lost = new Hashtable(); this.policy = policy; this.handle = endpoint.getLocalNodeHandle(); this.id = Integer.MIN_VALUE; endpoint.register(); // schedule the period liveness checks of the parent endpoint.scheduleMessage(new MaintenanceMessage(), environment.getRandomSource().nextInt(MAINTENANCE_INTERVAL), MAINTENANCE_INTERVAL); if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Starting up Scribe"); } } /** * Gets the Environment attribute of the ScribeImpl object * * @return The Environment value */ public Environment getEnvironment() { return environment; } /** * Returns the current policy for this scribe object * * @return The current policy for this scribe */ public ScribePolicy getPolicy() { return policy; } /** * Returns the Id of the local node * * @return The Id of the local node */ public Id getId() { return endpoint.getId(); } /** * Returns the list of clients for a given topic * * @param topic The topic to return the clients of * @return The clients of the topic */ public ScribeClient[] getClients(Topic topic) { if (topics.get(topic) != null) { return ((TopicManager) topics.get(topic)).getClients(); } return new ScribeClient[0]; } /** * Returns the list of children for a given topic * * @param topic The topic to return the children of * @return The children of the topic */ public NodeHandle[] getChildren(Topic topic) { if (topics.get(topic) != null) { return ((TopicManager) topics.get(topic)).getChildren(); } return new NodeHandle[0]; } /** * Returns the parent for a given topic * * @param topic The topic to return the parent of * @return The parent of the topic */ public NodeHandle getParent(Topic topic) { if (topics.get(topic) != null) { return ((TopicManager) topics.get(topic)).getParent(); } return null; } /** * Returns whether or not this Scribe is the root for the given topic * * @param topic The topic in question * @return Whether or not we are currently the root */ public boolean isRoot(Topic topic) { NodeHandleSet set = endpoint.replicaSet(topic.getId(), 1); if (set.size() == 0) { return false; } else { return set.getHandle(0).getId().equals(endpoint.getId()); } } /** * Returns the list of topics the given client is subscribed to. * * @param client The client in question * @return The list of topics */ public Topic[] getTopics(ScribeClient client) { Vector result = new Vector(); Enumeration e = topics.keys(); while (e.hasMoreElements()) { Topic topic = (Topic) e.nextElement(); if (((TopicManager) topics.get(topic)).containsClient(client)) { result.add(topic); } } return (Topic[]) result.toArray(new Topic[0]); } /** * Sets the current policy for this scribe object * * @param policy The current policy for this scribe */ public void setPolicy(ScribePolicy policy) { this.policy = policy; } /** * Sets the ContentDeserializer attribute of the ScribeImpl object * * @param deserializer The new ContentDeserializer value */ public void setContentDeserializer(ScribeContentDeserializer deserializer) { contentDeserializer = deserializer; } /** * Internal method for sending a subscribe message * * @param topic DESCRIBE THE PARAMETER * @param client DESCRIBE THE PARAMETER * @param content DESCRIBE THE PARAMETER */ private void sendSubscribe(Topic topic, ScribeClient client, RawScribeContent content) { sendSubscribe(topic, client, content, null); } /** * Internal method for sending a subscribe message * * @param topic DESCRIBE THE PARAMETER * @param client DESCRIBE THE PARAMETER * @param content DESCRIBE THE PARAMETER * @param previousParent DESCRIBE THE PARAMETER */ private void sendSubscribe(Topic topic, ScribeClient client, RawScribeContent content, Id previousParent) { id++; if (logger.level <= Logger.FINEST) { logger.log(endpoint.getId() + ": Sending subscribe message for topic " + topic + " client:" + client); } //logException(Logger.FINEST,"Stack Trace",new Exception("StackTrace")); // TODO: This should go to the ScribeImpl if (client == null) { ScribeClient[] clients = getClients(topic); if (clients.length > 0) { client = clients[0]; } } if (client != null) { outstanding.put(new Integer(id), client); } endpoint.route(topic.getId(), new SubscribeMessage(handle, topic, previousParent, id, content), null);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -