⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 topichandler.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
/*------------------------------------------------------------------------------Name:      TopicHandler.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.util.Map;import java.util.Properties;import java.util.TreeMap;import java.util.Set;import java.util.HashSet;import java.util.ArrayList;import java.util.Iterator;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.checkpoint.I_Checkpoint;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.engine.qos.ConnectQosServer;import org.xmlBlaster.engine.qos.SubscribeQosServer;import org.xmlBlaster.engine.query.plugins.QueueQueryPlugin;import org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry;import org.xmlBlaster.engine.queuemsg.MsgQueueHistoryEntry;import org.xmlBlaster.engine.queuemsg.TopicEntry;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.key.QueryKeyData;import org.xmlBlaster.util.qos.TopicProperty;import org.xmlBlaster.util.qos.HistoryQos;import org.xmlBlaster.util.qos.QueryQosData;import org.xmlBlaster.util.qos.StatusQosData;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.authentication.Authenticate;import org.xmlBlaster.authentication.SubjectInfo;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.engine.xml2java.XmlKey;import org.xmlBlaster.engine.qos.PublishQosServer;import org.xmlBlaster.engine.qos.EraseQosServer;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.qos.AccessFilterQos;import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.qos.storage.MsgUnitStoreProperty;import org.xmlBlaster.engine.distributor.I_MsgDistributor;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.mime.I_AccessFilter;import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;import org.xmlBlaster.client.key.EraseKey;import org.xmlBlaster.client.qos.EraseQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.engine.qos.UnSubscribeQosServer;import org.xmlBlaster.client.qos.UnSubscribeReturnQos;/** * A topic handles all MsgUnit entries of same oid and its subscribers. * <p> * This handler has the state UNCONFIGURED | UNREFERENCED | ALIVE | DEAD, see * the boolean state access methods for a description * </p> * The topicHandler access is only over TopicAccessor which assures that most one thread * enters TopicHandler to avoid synchronization problems. * * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.message.lifecycle.html">The engine.message.lifecylce requirement</a> * @see org.xmlBlaster.test.topic.TestTopicLifeCycle * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> */public final class TopicHandler implements I_Timeout, TopicHandlerMBean //, I_ChangeCallback{   private String ME = "TopicHandler";   private final ServerScope serverScope;   private static Logger log = Logger.getLogger(TopicHandler.class.getName());   private final ContextNode contextNode;   private boolean dyingInProgress = false;   /** The unique identifier of this topic e.g. "/node/heron/topic/Hello" */   private final String id;   /** The broker which manages me */   private final RequestBroker requestBroker;   private TopicEntry topicEntry; // persistence storage entry   // Default is that a single client can subscribe the same message multiple times   // private boolean allowMultiSubscriptionPerClient = serverScope.getProperty().get("Engine.allowMultiSubscriptionPerClient", true);   private I_MsgDistributor distributor;   /**    * This map knows all clients which have subscribed on this message content    * and knows all individual wishes of the subscription (QoS).    *    * The map contains SubscriptionInfo objects.    *    * It is a TreeMap, that means it keeps order information.    * TODO: express order attribute so that the first client will be served first.    *    * key   = a unique key identifying the subscription    * value = SubscriptionInfo object    */   final private Map subscriberMap = new TreeMap();   /** Do error recovery if message can't be delivered and we give it up */   /**    * MsgUnit references are stored in a persistent history queue.    */   private I_Queue historyQueue;   private SessionName creatorSessionName;   /** The configuration for this TopicHandler */   private TopicProperty topicProperty;   private I_Map msgUnitCache;   /** The xmlKey with parsed DOM tree, is null in state=UNCONFIGURED */   private XmlKey xmlKey;   /** Attribute oid of key tag: <key oid="..."> </key> */   private String uniqueKey;   /** This holds the quick parsed key information, if you need the DOM use xmlKey instead */   private MsgKeyData msgKeyData;   private boolean handlerIsNewCreated=true;  // a little helper showing if topic is new created   private boolean isRegisteredInBigXmlDom = false;   private boolean isHistoryHandling; // marker if we are working on the history queue, whe have to prevent topic status changes triggered which would spoil our publish thread   /**    * This topic is destroyed after given timeout    * The timer is activated on state change to UNREFERENCED    * and removed on change to ALIVE    */   private Timeout destroyTimer;   private Timestamp timerKey = null;   public final static int UNDEF = -1;   public final static int UNCONFIGURED = 0;   public final static int ALIVE = 1;   public final static int UNREFERENCED = 2;   public final static int SOFT_ERASED = 3;   public final static int DEAD = 4;   private int state = UNDEF;   private I_SubscriptionListener subscriptionListener;   private Object msgUnitWrapperUnderConstructionMutex = new Object();   private MsgUnitWrapper msgUnitWrapperUnderConstruction;   /** this is used for administrative gets (queries on callback queue) */   private QueueQueryPlugin queueQueryPlugin;   /** My JMX registration */   private JmxMBeanHandle mbeanHandle;   private boolean administrativeInitialize;   private boolean clientTagLog = false;   protected Object clone() {      throw new RuntimeException("TopicHandler NO CLONEING PLEASE");   }   /**    * Use this constructor if a yet unknown object is fed by method publish().    * <p />    * You should call publish() thereafter    * @param requestBroker    * @param publisherSessionInfo Is null if created by subscription    * @param a MsgUnitWrapper containing the CORBA MsgUnit data container    */   public TopicHandler(RequestBroker requestBroker, SessionInfo publisherSessionInfo, String uniqueKey) throws XmlBlasterException {      this.serverScope = requestBroker.getServerScope();      if (uniqueKey == null)         throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invalid constructor parameters");      this.uniqueKey = uniqueKey;      this.id = this.serverScope.getNodeId() + "/" + ContextNode.TOPIC_MARKER_TAG + "/" + this.uniqueKey;      this.ME = this.serverScope.getLogPrefix() + "/" + ContextNode.TOPIC_MARKER_TAG + "/" + this.uniqueKey;      // JMX does not allow commas ','      String instanceName = this.serverScope.validateJmxValue(this.uniqueKey);      this.contextNode = new ContextNode(ContextNode.TOPIC_MARKER_TAG, instanceName, this.serverScope.getContextNode());      this.requestBroker = requestBroker;      this.destroyTimer = this.serverScope.getTopicTimer();      // this.msgErrorHandler = new MsgTopicErrorHandler(this.glob, this);      toUnconfigured();      // JMX register "topic/hello"      this.mbeanHandle = this.serverScope.registerMBean(this.contextNode, this);      if (publisherSessionInfo == null) {         if (log.isLoggable(Level.FINER)) log.fine(ME+": Creating new TopicHandler '" + uniqueKey + "' because of subscription.");      }      else {         if (log.isLoggable(Level.FINER)) log.fine(ME+": Creating new TopicHandler '" + uniqueKey + "' because of publish.");      }      // mimeType and content remains unknown until first data is fed   }   /**    * The unique identifier of this topic e.g. "/node/heron/topic/Hello"    */   public String getId() {      return this.id;   }   /**    * The unique name of this topic instance.    * @return Never null, for example "/xmlBlaster/node/heron/topic/hello"    */   public final ContextNode getContextNode() {      return this.contextNode;   }   /**    * Initialize the messageUnit cache and the history queue for this topic    */   private void administrativeInitialize(MsgKeyData msgKeyData, MsgQosData publishQos,                             PublishQosServer publishQosServer) throws XmlBlasterException {      if (publishQosServer.getTopicEntry() != null) {         this.topicEntry = publishQosServer.getTopicEntry(); // Call from persistent layer, reuse the TopicEntry         if (log.isLoggable(Level.FINE)) log.fine(ME+": Reuse TopicEntry persistence handle");         if (log.isLoggable(Level.FINEST)) log.finest(ME+": Reuse TopicEntry persistence handle: " + this.topicEntry.toXml());      }      if (this.msgKeyData == null) {         this.msgKeyData = msgKeyData;      }      if (log.isLoggable(Level.FINEST)) log.finest(ME+": administrativeInitialize()" + publishQos.toXml());      this.creatorSessionName = publishQos.getSender();      this.topicProperty = publishQos.getTopicProperty();      startupMsgstore();      // Todo: this needs to be done after TopicHandler is created      startupHistoryQueue();      if (isUnconfigured()) { // Startup of topic         if (!hasCacheEntries() && !hasExactSubscribers()) {            toUnreferenced(true, publishQosServer.isFromPersistenceStore());         }         else {            toAlive();         }      }      if (true /*log.INFO*/) {         long maxEntriesHistory = this.topicProperty.getHistoryQueueProperty().getMaxEntries();         String hist = (maxEntriesHistory > 0) ? "history/maxEntries="+maxEntriesHistory : "message history is switched off with queue/history/maxEntries=0";

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -