⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 topicimpl.java

📁 一个类似于openJMS分布在ObjectWeb之下的JMS消息中间件。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2003 - 2004 Bull SA * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. *  * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU * Lesser General Public License for more details. *  * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.mom.dest;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import java.util.Properties;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.AgentServer;import fr.dyade.aaa.agent.Channel;import fr.dyade.aaa.agent.DeleteNot;import fr.dyade.aaa.agent.Notification;import fr.dyade.aaa.agent.UnknownAgent;import fr.dyade.aaa.agent.UnknownNotificationException;import org.objectweb.joram.shared.admin.*;import org.objectweb.joram.mom.notifications.*;import org.objectweb.joram.mom.notifications.AdminReply;import org.objectweb.joram.shared.excepts.*;import org.objectweb.joram.shared.messages.Message;import org.objectweb.joram.shared.selectors.*;import org.objectweb.joram.mom.MomTracing;import org.objectweb.util.monolog.api.BasicLevel;/** * The <code>TopicImpl</code> class implements the MOM topic behaviour, * basically distributing the received messages to subscribers. * <p> * A Topic might be part of a hierarchy; if it is the case, and if the topic * is not on top of that hierarchy, it will have a father to forward messages * to. * <p> * A topic might also be part of a cluster; if it is the case, it will have * friends to forward messages to. * <p> * A topic can't be part of a hierarchy and of a cluster at the same time. */public class TopicImpl extends DestinationImpl implements TopicImplMBean {  /** Identifier of this topic's father, if any. */  protected AgentId fatherId = null;  /** Vector of cluster fellows, if any. */  protected Vector friends = null;    /** Vector of subscribers' identifiers. */  protected Vector subscribers;  /** Table of subscribers' selectors. */  protected Hashtable selectors;  /** Internal boolean used for tagging local sendings. */  protected transient boolean alreadySentLocally;  /**   * Constructs a <code>TopicImpl</code> instance.   *   * @param destId  Identifier of the agent hosting the topic.   * @param adminId  Identifier of the administrator of the topic.   * @param prop     The initial set of properties.   */  public TopicImpl(AgentId destId, AgentId adminId, Properties prop) {    super(destId, adminId, prop);    subscribers = new Vector();    selectors = new Hashtable();  }  /**   * Returns a string representation of this destination.   */  public String toString() {    return "TopicImpl:" + destId.toString();  }  /**   * Distributes the received notifications to the appropriate reactions.   *   * @exception UnknownNotificationException  If a received notification is   *              unexpected by the topic.   */  public void react(AgentId from, Notification not)              throws UnknownNotificationException {    alreadySentLocally = false;    int reqId = -1;    if (not instanceof AbstractRequest)      reqId = ((AbstractRequest) not).getRequestId();    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this                                    + ": got " + not.getClass().getName()                                    + " with id: " + reqId                                    + " from: " + from.toString());    try {      if (not instanceof ClusterRequest)        doReact(from, (ClusterRequest) not);      else if (not instanceof ClusterTest)        doReact(from, (ClusterTest) not);      else if (not instanceof ClusterAck)        doReact(from, (ClusterAck) not);      else if (not instanceof ClusterNot)        doReact(from, (ClusterNot) not);      else if (not instanceof UnclusterRequest)        doReact(from, (UnclusterRequest) not);      else if (not instanceof UnclusterNot)        doReact(from, (UnclusterNot) not);      else if (not instanceof SetFatherRequest)        doReact(from, (SetFatherRequest) not);      else if (not instanceof FatherTest)        doReact(from, (FatherTest) not);      else if (not instanceof FatherAck)        doReact(from, (FatherAck) not);      else if (not instanceof UnsetFatherRequest)        doReact(from, (UnsetFatherRequest) not);      else if (not instanceof Monit_GetSubscriptions)        doReact(from, (Monit_GetSubscriptions) not);      else if (not instanceof Monit_GetFather)        doReact(from, (Monit_GetFather) not);      else if (not instanceof Monit_GetCluster)        doReact(from, (Monit_GetCluster) not);      else if (not instanceof SubscribeRequest)        doReact(from, (SubscribeRequest) not);      else if (not instanceof UnsubscribeRequest)        doReact(from, (UnsubscribeRequest) not);      else if (not instanceof TopicForwardNot)        doReact(from, (TopicForwardNot) not);      else if (not instanceof DestinationAdminRequestNot)        doReact(from, (DestinationAdminRequestNot) not);      else        super.react(from, not);    }    // MOM exceptions are sent to the requester.    catch (MomException exc) {      if (MomTracing.dbgDestination.isLoggable(BasicLevel.WARN))        MomTracing.dbgDestination.log(BasicLevel.WARN, exc);      AbstractRequest req = (AbstractRequest) not;      Channel.sendTo(from, new ExceptionReply(req, exc));    }  }  /**   * Method implementing the reaction to a <code>ClusterRequest</code>   * instance requesting to add a topic to the cluster, or to set a   * cluster with a given topic.   *   * @exception AccessException  If the requester is not an administrator.   */  protected void doReact(AgentId from, ClusterRequest req)                 throws AccessException  {    if (! isAdministrator(from))      throw new AccessException("ADMIN right not granted");    String info = null;    if (fatherId != null) {      info = strbuf.append("Request [").append(req.getClass().getName())        .append("], sent to Topic [").append(destId)        .append("], successful [false]: topic part of a hierarchy").toString();      strbuf.setLength(0);      Channel.sendTo(from, new AdminReply(req, false, info));      return;    }    AgentId newFriendId = req.getTopicId();    if (friends == null) {      // state change, so save.      setSave();      friends = new Vector();    }    if (friends.contains(newFriendId) || destId.equals(newFriendId)) {      info = strbuf.append("Request [").append(req.getClass().getName())        .append("], sent to Topic [").append(destId)        .append("], successful [false]: joining topic already")        .append(" part of cluster").toString();      strbuf.setLength(0);      Channel.sendTo(from, new AdminReply(req, false, info));      return;    }    ClusterTest not = new ClusterTest(req, from);    Channel.sendTo(newFriendId, not);  }  /**   * Method implementing the reaction to a <code>ClusterTest</code>   * notification sent by a fellow topic for testing if this topic might be   * part of a cluster.   */  protected void doReact(AgentId from, ClusterTest not)  {    String info = null;    // The topic is already part of a cluster: can't join an other cluster.    if (friends != null && ! friends.isEmpty()) {      info = strbuf.append("Topic [").append(destId)        .append("] can't join cluster of topic [").append(from)        .append("] as it is already part of a cluster").toString();      strbuf.setLength(0);      Channel.sendTo(from, new ClusterAck(not, false, info));    // The topic is already part of a hierarchy: can't join a cluster.    } else if (fatherId != null) {      info = strbuf.append("Topic [").append(destId)        .append("] can't join cluster of topic [").append(from)        .append("] as it is already part of a hierarchy").toString();      strbuf.setLength(0);      Channel.sendTo(from, new ClusterAck(not, false, info));    // The topic is free: joining the cluster.    } else {      // state change, so save.      setSave();      friends = new Vector();      friends.add(from);      info = strbuf.append("Topic [").append(destId)        .append("] ok for joining cluster of topic [").append(from)        .append(']').toString();      strbuf.setLength(0);      Channel.sendTo(from, new ClusterAck(not, true, info));      if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))        MomTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic "                                      + destId.toString() + " joins cluster"                                      + "cluster of topic " + from.toString());    }  }  /**   * Method implementing the reaction to a <code>ClusterAck</code>   * notification sent by a topic requested to join the cluster.   */   protected void doReact(AgentId from, ClusterAck ack)  {     // The topic does not accept to join the cluster: doing nothing.    if (! ack.ok) {      Channel.sendTo(ack.requester,                     new AdminReply(ack.request, false, ack.info));      return;    }      AgentId fellowId;    ClusterNot fellowNot;    ClusterNot newFriendNot = new ClusterNot(from);    for (int i = 0; i < friends.size(); i++) {      fellowId = (AgentId) friends.get(i);      fellowNot = new ClusterNot(fellowId);      // Notifying the joining topic of the current fellow.      Channel.sendTo(from, fellowNot);      // Notifying the current fellow of the joining topic.      Channel.sendTo(fellowId, newFriendNot);    }    // state change, so save.    setSave();    friends.add(from);    String info = strbuf.append("Request [")      .append(ack.request.getClass().getName())      .append("], sent to Topic [").append(destId)      .append("], successful [true]: topic [")      .append(from).append("] joined cluster").toString();    strbuf.setLength(0);    Channel.sendTo(ack.requester, new AdminReply(ack.request, true, info));    if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG, info);  }  /**   * Method implementing the reaction to a <code>ClusterNot</code>   * notification sent by a fellow topic for notifying this topic   * of a new cluster fellow.   */  protected void doReact(AgentId from, ClusterNot not)  {    // state change, so save.    setSave();    friends.add(not.topicId);          if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG))      MomTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic "                                    + not.topicId.toString()                                    + " set as a fellow.");  }   /**   * Method implementing the reaction to an <code>UnclusterRequest</code>   * instance requesting this topic to leave the cluster it is part of.   *   * @exception AccessException  If the requester is not an administrator.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -