⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribeimpl.java

📁 p2p仿真器。开发者可以工作在覆盖层中进行创造和测试逻辑算法或者创建和测试新的服务。PlanetSim还可以将仿真代码平稳转换为在Internet上的实验代码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*************************************************************************"Free Pastry" Peer-to-Peer Application Development SubstrateCopyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither  the name  of Rice  University (RICE) nor  the names  of itscontributors may be  used to endorse or promote  products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************/package planet.scribe;import java.util.Arrays;import java.util.Hashtable;import java.util.Iterator;import java.util.List;import java.util.Observable;import java.util.Observer;import java.util.Set;import java.util.Vector;import planet.commonapi.Application;import planet.commonapi.EndPoint;import planet.commonapi.Id;import planet.commonapi.Message;import planet.commonapi.NodeHandle;import planet.commonapi.exception.InitializationException;import planet.generic.commonapi.factory.GenericFactory;import planet.scribe.messaging.AnycastMessage;import planet.scribe.messaging.DropMessage;import planet.scribe.messaging.PublishMessage;import planet.scribe.messaging.PublishRequestMessage;import planet.scribe.messaging.ReplicaSetMessage;import planet.scribe.messaging.SubscribeAckMessage;import planet.scribe.messaging.SubscribeFailedMessage;import planet.scribe.messaging.SubscribeLostMessage;import planet.scribe.messaging.SubscribeMessage;import planet.scribe.messaging.UnsubscribeMessage;import planet.simulate.Logger;/** * The provided implementation of Scribe. * * @version $Id: ScribeImpl.java,v 1.14 2003/10/22 03:16:40 amislove Exp $ * @author Alan Mislove */public class ScribeImpl implements Scribe, Application {  /**   * the timeout for a subscribe message   */  public static long MESSAGE_TIMEOUT = 15000;  /**   * the hashtable of topic -> TopicManager   */  public Hashtable topics;  /**   * this scribe's policy   */  protected ScribePolicy policy;  /**   * this application's endpoint   */  protected EndPoint endpoint;  /**   * the logger which we will use   */  //protected Logger log = null;  /**   * the local node handle   */  protected NodeHandle handle;  /**   * the hashtable of outstanding messages   */  private Hashtable outstanding;  /**   * the next unique id   */  private int id;    /**   * the next unique id   */  private String appId;  /**   * Constructor for Scribe, using the default policy.   *   * @param appId Application name   */  public ScribeImpl(String appId) {  	    this(appId,new ScribePolicy.DefaultScribePolicy());  }  /**   * Constructor for Scribe   *   * @param appId Application name   * @param policy The policy for this Scribe   */  public ScribeImpl(String appId,ScribePolicy policy) {    //this.endpoint = node.registerApplication(this, appId);  	this.appId = appId;    this.topics = new Hashtable();    this.outstanding = new Hashtable();    this.policy = policy;    //this.handle = endpoint.getLocalNodeHandle();    this.id = Integer.MIN_VALUE;        //log = Logger.getLogger(this.getClass().getName());     //System.out.println("antes -->"+log.getLevel());    //log.addHandler(new StreamHandler(System.out,new SimpleFormatter()));    //log.addHandler(new ConsoleHandler());    //log.setLevel(Level.ALL);    //System.out.println("despues-->"+log.getLevel());  }    /**   * An upcall to inform this Application for a new step.   * This method is invoked at the end of each simulation step.    * Before this, may arrive any number of application Messages,   * depending on your own main application.   */  public void byStep(){}  public void setEndPoint(EndPoint endpoint) {  	this.endpoint=endpoint;  	this.handle = endpoint.getLocalNodeHandle();  	Logger.log(endpoint.getId() + ": Starting up Scribe",Logger.EVENT_LOG);  }    /**   * Returns the current policy for this scribe object   *   * @return The current policy for this scribe   */  public ScribePolicy getPolicy() {    return policy;  }  /**   * Sets the current policy for this scribe object   *   * @param policy The current policy for this scribe   */  public void setPolicy(ScribePolicy policy) {    this.policy = policy;  }    /**   * Returns the Id of the local node   *   * @return The Id of the local node   */  public Id getNodeId() {	  return endpoint.getId();  }  /**   * Returns the Id of the application   *   * @return The Id of the application   */	  public String getId() {    return appId;  }    /**   * Sets the identification of this application.   * @param appId Identification of this application.   */  public void setId(String appId) {    this.appId = appId;  }  /**   * Returns the list of clients for a given topic   *   * @param topic The topic to return the clients of   * @return The clients of the topic   */  public ScribeClient[] getClients(Topic topic) {    if (topics.get(topic) != null) {      return ((TopicManager) topics.get(topic)).getClients();    }    return new ScribeClient[0];  }  /**   * Returns the list of children for a given topic   *   * @param topic The topic to return the children of   * @return The children of the topic   */  public NodeHandle[] getChildren(Topic topic) {    if (topics.get(topic) != null) {      return ((TopicManager) topics.get(topic)).getChildren();    }    return new NodeHandle[0];  }  /**   * Returns the parent for a given topic   *   * @param topic The topic to return the parent of   * @return The parent of the topic   */  public NodeHandle getParent(Topic topic) {    if (topics.get(topic) != null) {      return ((TopicManager) topics.get(topic)).getParent();    }    return null;  }  /**   * Returns whether or not this Scribe is the root for the given topic   *   * @param topic The topic in question   * @return Whether or not we are currently the root   */   public boolean isRoot(Topic topic) {        Vector set = endpoint.replicaSet(topic.getId(), 1);        if (set==null)          return false;        else          return ((NodeHandle) set.get(0)).getId().equals(endpoint.getId());          }    private void isRoot(Topic topic, IsRootListener irl) { 	  	endpoint.route(topic.getId(),new ReplicaSetMessage(endpoint.getId(),1),null);  }  /**   * Internal method for sending a subscribe message   *   * @param Topic topic   */  private void sendSubscribe(Topic topic, ScribeClient client, ScribeContent content) {    sendSubscribe(topic, client, content, null);  }  /**   * Internal method for sending a subscribe message   *   * @param Topic topic   */  private void sendSubscribe(Topic topic, ScribeClient client, ScribeContent content, Id previousParent) {    id++;    Logger.log(endpoint.getId() + ": Sending subscribe message for topic " + topic,Logger.EVENT_LOG);    if (client != null)      outstanding.put(new Integer(id), client);    endpoint.route(topic.getId(), new SubscribeMessage(handle, topic, previousParent, id, content), null);  }  /**   * Internal method which processes an ack message   *   * @param message The ackMessage   */  private void ackMessageReceived(SubscribeAckMessage message) {    outstanding.remove(new Integer(message.getId()));  }  /**   * Internal method which processes a subscribe failed message   *   * @param message THe lost message   */  private void failedMessageReceived(SubscribeFailedMessage message) {       ScribeClient client = (ScribeClient) outstanding.remove(new Integer(message.getId()));    if (client != null)      client.subscribeFailed(message.getTopic());  }  /**   * Internal method which processes a subscribe lost message   *   * @param message THe lost message   */  private void lostMessageReceived(SubscribeLostMessage message) {    ScribeClient client = (ScribeClient) outstanding.remove(new Integer(message.getId()));    if (client != null)      client.subscribeFailed(message.getTopic());  }  // ----- SCRIBE METHODS -----  /**   * Subscribes the given client to the provided topic. Any message published to the topic will be   * delivered to the Client via the deliver() method.   *   * @param topic The topic to subscribe to   * @param client The client to give messages to   */  public void subscribe(Topic topic, ScribeClient client) {    subscribe(topic, client, null);  }  /**   * Subscribes the given client to the provided topic. Any message published to the topic will be   * delivered to the Client via the deliver() method.   *   * @param topic The topic to subscribe to   * @param client The client to give messages to   */  public void subscribe(Topic topic, ScribeClient client, ScribeContent content) {    Logger.log(endpoint.getId() + ": Subscribing client " + client + " to topic " + topic,Logger.EVENT_LOG);  	//System.out.println(endpoint.getId() + ": Subscribing client " + client + " to topic " + topic);    // if we don't know about this topic, subscribe    // otherwise, we simply add the client to the list    if (topics.get(topic) == null) {      topics.put(topic, new TopicManager(topic, client));      sendSubscribe(topic, client, content);    } else {      TopicManager manager = (TopicManager) topics.get(topic);      manager.addClient(client);      if ((manager.getParent() == null) && (! isRoot(topic))) {        sendSubscribe(topic, client, content);      }    }  }  /**   * Unsubscribes the given client from the provided topic.   *   * @param topic The topic to unsubscribe from   * @param client The client to unsubscribe   */  public void unsubscribe(Topic topic, ScribeClient client) {    Logger.log(endpoint.getId() + ": Unsubscribing client " + client + " from topic " + topic,Logger.EVENT_LOG);    if (topics.get(topic) != null) {      TopicManager manager = (TopicManager) topics.get(topic);      // if this is the last client and there are no children,      // then we unsubscribe from the topic      if (manager.removeClient(client)) {        topics.remove(topic);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -