📄 topicimpl.java
字号:
/* * JORAM: Java(TM) Open Reliable Asynchronous Messaging * Copyright (C) 2001 - 2006 ScalAgent Distributed Technologies * Copyright (C) 2003 - 2004 Bull SA * Copyright (C) 1996 - 2000 Dyade * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Initial developer(s): Frederic Maistre (INRIA) * Contributor(s): ScalAgent Distributed Technologies */package org.objectweb.joram.mom.dest;import java.util.Enumeration;import java.util.Hashtable;import java.util.Vector;import java.util.Properties;import fr.dyade.aaa.agent.AgentId;import fr.dyade.aaa.agent.AgentServer;import fr.dyade.aaa.agent.Channel;import fr.dyade.aaa.agent.DeleteNot;import fr.dyade.aaa.agent.Notification;import fr.dyade.aaa.agent.UnknownAgent;import fr.dyade.aaa.agent.UnknownNotificationException;import org.objectweb.joram.shared.admin.*;import org.objectweb.joram.mom.notifications.*;import org.objectweb.joram.mom.notifications.AdminReply;import org.objectweb.joram.shared.excepts.*;import org.objectweb.joram.shared.messages.Message;import org.objectweb.joram.shared.selectors.*;import org.objectweb.joram.mom.MomTracing;import org.objectweb.util.monolog.api.BasicLevel;/** * The <code>TopicImpl</code> class implements the MOM topic behaviour, * basically distributing the received messages to subscribers. * <p> * A Topic might be part of a hierarchy; if it is the case, and if the topic * is not on top of that hierarchy, it will have a father to forward messages * to. * <p> * A topic might also be part of a cluster; if it is the case, it will have * friends to forward messages to. * <p> * A topic can't be part of a hierarchy and of a cluster at the same time. */public class TopicImpl extends DestinationImpl implements TopicImplMBean { /** Identifier of this topic's father, if any. */ protected AgentId fatherId = null; /** Vector of cluster fellows, if any. */ protected Vector friends = null; /** Vector of subscribers' identifiers. */ protected Vector subscribers; /** Table of subscribers' selectors. */ protected Hashtable selectors; /** Internal boolean used for tagging local sendings. */ protected transient boolean alreadySentLocally; /** * Constructs a <code>TopicImpl</code> instance. * * @param destId Identifier of the agent hosting the topic. * @param adminId Identifier of the administrator of the topic. * @param prop The initial set of properties. */ public TopicImpl(AgentId destId, AgentId adminId, Properties prop) { super(destId, adminId, prop); subscribers = new Vector(); selectors = new Hashtable(); } /** * Returns a string representation of this destination. */ public String toString() { return "TopicImpl:" + destId.toString(); } /** * Distributes the received notifications to the appropriate reactions. * * @exception UnknownNotificationException If a received notification is * unexpected by the topic. */ public void react(AgentId from, Notification not) throws UnknownNotificationException { alreadySentLocally = false; int reqId = -1; if (not instanceof AbstractRequest) reqId = ((AbstractRequest) not).getRequestId(); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "--- " + this + ": got " + not.getClass().getName() + " with id: " + reqId + " from: " + from.toString()); try { if (not instanceof ClusterRequest) doReact(from, (ClusterRequest) not); else if (not instanceof ClusterTest) doReact(from, (ClusterTest) not); else if (not instanceof ClusterAck) doReact(from, (ClusterAck) not); else if (not instanceof ClusterNot) doReact(from, (ClusterNot) not); else if (not instanceof UnclusterRequest) doReact(from, (UnclusterRequest) not); else if (not instanceof UnclusterNot) doReact(from, (UnclusterNot) not); else if (not instanceof SetFatherRequest) doReact(from, (SetFatherRequest) not); else if (not instanceof FatherTest) doReact(from, (FatherTest) not); else if (not instanceof FatherAck) doReact(from, (FatherAck) not); else if (not instanceof UnsetFatherRequest) doReact(from, (UnsetFatherRequest) not); else if (not instanceof Monit_GetSubscriptions) doReact(from, (Monit_GetSubscriptions) not); else if (not instanceof Monit_GetFather) doReact(from, (Monit_GetFather) not); else if (not instanceof Monit_GetCluster) doReact(from, (Monit_GetCluster) not); else if (not instanceof SubscribeRequest) doReact(from, (SubscribeRequest) not); else if (not instanceof UnsubscribeRequest) doReact(from, (UnsubscribeRequest) not); else if (not instanceof TopicForwardNot) doReact(from, (TopicForwardNot) not); else if (not instanceof DestinationAdminRequestNot) doReact(from, (DestinationAdminRequestNot) not); else super.react(from, not); } // MOM exceptions are sent to the requester. catch (MomException exc) { if (MomTracing.dbgDestination.isLoggable(BasicLevel.WARN)) MomTracing.dbgDestination.log(BasicLevel.WARN, exc); AbstractRequest req = (AbstractRequest) not; Channel.sendTo(from, new ExceptionReply(req, exc)); } } /** * Method implementing the reaction to a <code>ClusterRequest</code> * instance requesting to add a topic to the cluster, or to set a * cluster with a given topic. * * @exception AccessException If the requester is not an administrator. */ protected void doReact(AgentId from, ClusterRequest req) throws AccessException { if (! isAdministrator(from)) throw new AccessException("ADMIN right not granted"); String info = null; if (fatherId != null) { info = strbuf.append("Request [").append(req.getClass().getName()) .append("], sent to Topic [").append(destId) .append("], successful [false]: topic part of a hierarchy").toString(); strbuf.setLength(0); Channel.sendTo(from, new AdminReply(req, false, info)); return; } AgentId newFriendId = req.getTopicId(); if (friends == null) { // state change, so save. setSave(); friends = new Vector(); } if (friends.contains(newFriendId) || destId.equals(newFriendId)) { info = strbuf.append("Request [").append(req.getClass().getName()) .append("], sent to Topic [").append(destId) .append("], successful [false]: joining topic already") .append(" part of cluster").toString(); strbuf.setLength(0); Channel.sendTo(from, new AdminReply(req, false, info)); return; } ClusterTest not = new ClusterTest(req, from); Channel.sendTo(newFriendId, not); } /** * Method implementing the reaction to a <code>ClusterTest</code> * notification sent by a fellow topic for testing if this topic might be * part of a cluster. */ protected void doReact(AgentId from, ClusterTest not) { String info = null; // The topic is already part of a cluster: can't join an other cluster. if (friends != null && ! friends.isEmpty()) { info = strbuf.append("Topic [").append(destId) .append("] can't join cluster of topic [").append(from) .append("] as it is already part of a cluster").toString(); strbuf.setLength(0); Channel.sendTo(from, new ClusterAck(not, false, info)); // The topic is already part of a hierarchy: can't join a cluster. } else if (fatherId != null) { info = strbuf.append("Topic [").append(destId) .append("] can't join cluster of topic [").append(from) .append("] as it is already part of a hierarchy").toString(); strbuf.setLength(0); Channel.sendTo(from, new ClusterAck(not, false, info)); // The topic is free: joining the cluster. } else { // state change, so save. setSave(); friends = new Vector(); friends.add(from); info = strbuf.append("Topic [").append(destId) .append("] ok for joining cluster of topic [").append(from) .append(']').toString(); strbuf.setLength(0); Channel.sendTo(from, new ClusterAck(not, true, info)); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic " + destId.toString() + " joins cluster" + "cluster of topic " + from.toString()); } } /** * Method implementing the reaction to a <code>ClusterAck</code> * notification sent by a topic requested to join the cluster. */ protected void doReact(AgentId from, ClusterAck ack) { // The topic does not accept to join the cluster: doing nothing. if (! ack.ok) { Channel.sendTo(ack.requester, new AdminReply(ack.request, false, ack.info)); return; } AgentId fellowId; ClusterNot fellowNot; ClusterNot newFriendNot = new ClusterNot(from); for (int i = 0; i < friends.size(); i++) { fellowId = (AgentId) friends.get(i); fellowNot = new ClusterNot(fellowId); // Notifying the joining topic of the current fellow. Channel.sendTo(from, fellowNot); // Notifying the current fellow of the joining topic. Channel.sendTo(fellowId, newFriendNot); } // state change, so save. setSave(); friends.add(from); String info = strbuf.append("Request [") .append(ack.request.getClass().getName()) .append("], sent to Topic [").append(destId) .append("], successful [true]: topic [") .append(from).append("] joined cluster").toString(); strbuf.setLength(0); Channel.sendTo(ack.requester, new AdminReply(ack.request, true, info)); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, info); } /** * Method implementing the reaction to a <code>ClusterNot</code> * notification sent by a fellow topic for notifying this topic * of a new cluster fellow. */ protected void doReact(AgentId from, ClusterNot not) { // state change, so save. setSave(); friends.add(not.topicId); if (MomTracing.dbgDestination.isLoggable(BasicLevel.DEBUG)) MomTracing.dbgDestination.log(BasicLevel.DEBUG, "Topic " + not.topicId.toString() + " set as a fellow."); } /** * Method implementing the reaction to an <code>UnclusterRequest</code> * instance requesting this topic to leave the cluster it is part of. * * @exception AccessException If the requester is not an administrator.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -