📄 topichandler.java
字号:
/*------------------------------------------------------------------------------Name: TopicHandler.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.util.Map;import java.util.Properties;import java.util.TreeMap;import java.util.Set;import java.util.HashSet;import java.util.ArrayList;import java.util.Iterator;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.checkpoint.I_Checkpoint;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.engine.qos.ConnectQosServer;import org.xmlBlaster.engine.qos.SubscribeQosServer;import org.xmlBlaster.engine.query.plugins.QueueQueryPlugin;import org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry;import org.xmlBlaster.engine.queuemsg.MsgQueueHistoryEntry;import org.xmlBlaster.engine.queuemsg.TopicEntry;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.key.QueryKeyData;import org.xmlBlaster.util.qos.TopicProperty;import org.xmlBlaster.util.qos.HistoryQos;import org.xmlBlaster.util.qos.QueryQosData;import org.xmlBlaster.util.qos.StatusQosData;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.client.qos.ConnectQos;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.authentication.Authenticate;import org.xmlBlaster.authentication.SubjectInfo;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.engine.xml2java.XmlKey;import org.xmlBlaster.engine.qos.PublishQosServer;import org.xmlBlaster.engine.qos.EraseQosServer;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.qos.AccessFilterQos;import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.qos.storage.MsgUnitStoreProperty;import org.xmlBlaster.engine.distributor.I_MsgDistributor;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.mime.I_AccessFilter;import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;import org.xmlBlaster.client.key.EraseKey;import org.xmlBlaster.client.qos.EraseQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.engine.qos.UnSubscribeQosServer;import org.xmlBlaster.client.qos.UnSubscribeReturnQos;/** * A topic handles all MsgUnit entries of same oid and its subscribers. * <p> * This handler has the state UNCONFIGURED | UNREFERENCED | ALIVE | DEAD, see * the boolean state access methods for a description * </p> * The topicHandler access is only over TopicAccessor which assures that most one thread * enters TopicHandler to avoid synchronization problems. * * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.message.lifecycle.html">The engine.message.lifecylce requirement</a> * @see org.xmlBlaster.test.topic.TestTopicLifeCycle * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> */public final class TopicHandler implements I_Timeout, TopicHandlerMBean //, I_ChangeCallback{ private String ME = "TopicHandler"; private final ServerScope serverScope; private static Logger log = Logger.getLogger(TopicHandler.class.getName()); private final ContextNode contextNode; private boolean dyingInProgress = false; /** The unique identifier of this topic e.g. "/node/heron/topic/Hello" */ private final String id; /** The broker which manages me */ private final RequestBroker requestBroker; private TopicEntry topicEntry; // persistence storage entry // Default is that a single client can subscribe the same message multiple times // private boolean allowMultiSubscriptionPerClient = serverScope.getProperty().get("Engine.allowMultiSubscriptionPerClient", true); private I_MsgDistributor distributor; /** * This map knows all clients which have subscribed on this message content * and knows all individual wishes of the subscription (QoS). * * The map contains SubscriptionInfo objects. * * It is a TreeMap, that means it keeps order information. * TODO: express order attribute so that the first client will be served first. * * key = a unique key identifying the subscription * value = SubscriptionInfo object */ final private Map subscriberMap = new TreeMap(); /** Do error recovery if message can't be delivered and we give it up */ /** * MsgUnit references are stored in a persistent history queue. */ private I_Queue historyQueue; private SessionName creatorSessionName; /** The configuration for this TopicHandler */ private TopicProperty topicProperty; private I_Map msgUnitCache; /** The xmlKey with parsed DOM tree, is null in state=UNCONFIGURED */ private XmlKey xmlKey; /** Attribute oid of key tag: <key oid="..."> </key> */ private String uniqueKey; /** This holds the quick parsed key information, if you need the DOM use xmlKey instead */ private MsgKeyData msgKeyData; private boolean handlerIsNewCreated=true; // a little helper showing if topic is new created private boolean isRegisteredInBigXmlDom = false; private boolean isHistoryHandling; // marker if we are working on the history queue, whe have to prevent topic status changes triggered which would spoil our publish thread /** * This topic is destroyed after given timeout * The timer is activated on state change to UNREFERENCED * and removed on change to ALIVE */ private Timeout destroyTimer; private Timestamp timerKey = null; public final static int UNDEF = -1; public final static int UNCONFIGURED = 0; public final static int ALIVE = 1; public final static int UNREFERENCED = 2; public final static int SOFT_ERASED = 3; public final static int DEAD = 4; private int state = UNDEF; private I_SubscriptionListener subscriptionListener; private Object msgUnitWrapperUnderConstructionMutex = new Object(); private MsgUnitWrapper msgUnitWrapperUnderConstruction; /** this is used for administrative gets (queries on callback queue) */ private QueueQueryPlugin queueQueryPlugin; /** My JMX registration */ private JmxMBeanHandle mbeanHandle; private boolean administrativeInitialize; private boolean clientTagLog = false; protected Object clone() { throw new RuntimeException("TopicHandler NO CLONEING PLEASE"); } /** * Use this constructor if a yet unknown object is fed by method publish(). * <p /> * You should call publish() thereafter * @param requestBroker * @param publisherSessionInfo Is null if created by subscription * @param a MsgUnitWrapper containing the CORBA MsgUnit data container */ public TopicHandler(RequestBroker requestBroker, SessionInfo publisherSessionInfo, String uniqueKey) throws XmlBlasterException { this.serverScope = requestBroker.getServerScope(); if (uniqueKey == null) throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Invalid constructor parameters"); this.uniqueKey = uniqueKey; this.id = this.serverScope.getNodeId() + "/" + ContextNode.TOPIC_MARKER_TAG + "/" + this.uniqueKey; this.ME = this.serverScope.getLogPrefix() + "/" + ContextNode.TOPIC_MARKER_TAG + "/" + this.uniqueKey; // JMX does not allow commas ',' String instanceName = this.serverScope.validateJmxValue(this.uniqueKey); this.contextNode = new ContextNode(ContextNode.TOPIC_MARKER_TAG, instanceName, this.serverScope.getContextNode()); this.requestBroker = requestBroker; this.destroyTimer = this.serverScope.getTopicTimer(); // this.msgErrorHandler = new MsgTopicErrorHandler(this.glob, this); toUnconfigured(); // JMX register "topic/hello" this.mbeanHandle = this.serverScope.registerMBean(this.contextNode, this); if (publisherSessionInfo == null) { if (log.isLoggable(Level.FINER)) log.fine(ME+": Creating new TopicHandler '" + uniqueKey + "' because of subscription."); } else { if (log.isLoggable(Level.FINER)) log.fine(ME+": Creating new TopicHandler '" + uniqueKey + "' because of publish."); } // mimeType and content remains unknown until first data is fed } /** * The unique identifier of this topic e.g. "/node/heron/topic/Hello" */ public String getId() { return this.id; } /** * The unique name of this topic instance. * @return Never null, for example "/xmlBlaster/node/heron/topic/hello" */ public final ContextNode getContextNode() { return this.contextNode; } /** * Initialize the messageUnit cache and the history queue for this topic */ private void administrativeInitialize(MsgKeyData msgKeyData, MsgQosData publishQos, PublishQosServer publishQosServer) throws XmlBlasterException { if (publishQosServer.getTopicEntry() != null) { this.topicEntry = publishQosServer.getTopicEntry(); // Call from persistent layer, reuse the TopicEntry if (log.isLoggable(Level.FINE)) log.fine(ME+": Reuse TopicEntry persistence handle"); if (log.isLoggable(Level.FINEST)) log.finest(ME+": Reuse TopicEntry persistence handle: " + this.topicEntry.toXml()); } if (this.msgKeyData == null) { this.msgKeyData = msgKeyData; } if (log.isLoggable(Level.FINEST)) log.finest(ME+": administrativeInitialize()" + publishQos.toXml()); this.creatorSessionName = publishQos.getSender(); this.topicProperty = publishQos.getTopicProperty(); startupMsgstore(); // Todo: this needs to be done after TopicHandler is created startupHistoryQueue(); if (isUnconfigured()) { // Startup of topic if (!hasCacheEntries() && !hasExactSubscribers()) { toUnreferenced(true, publishQosServer.isFromPersistenceStore()); } else { toAlive(); } } if (true /*log.INFO*/) { long maxEntriesHistory = this.topicProperty.getHistoryQueueProperty().getMaxEntries(); String hist = (maxEntriesHistory > 0) ? "history/maxEntries="+maxEntriesHistory : "message history is switched off with queue/history/maxEntries=0";
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -