📄 subscriptioninfo.java
字号:
/*------------------------------------------------------------------------------Name: SubscriptionInfo.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: Handles exactly one subscritpion (client reference and QoS of this subscritionAuthor: xmlBlaster@marcelruff.info------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.engine.qos.SubscribeQosServer;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.qos.AccessFilterQos;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.key.KeyData;import org.xmlBlaster.util.key.QueryKeyData;import org.xmlBlaster.util.qos.QueryQosData;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.util.IsoDateParser;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;import java.util.ArrayList;import java.util.Map;/** * This is a container to hold references on all interesting data * concerning a subscription of exactly one topic from exactly one client. * <p /> * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/interface.subscribe.html">The interface.subscribe requirement</a> * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> */public final class SubscriptionInfo implements /*I_AdminSubscription,*/ SubscriptionInfoMBean /* implements Comparable see SORT_PROBLEM */{ private String ME = "SubscriptionInfo"; private ContextNode contextNode; /** The global handle */ private ServerScope glob; /** Logging to channel "core" */ private static Logger log = Logger.getLogger(SubscriptionInfo.class.getName()); /** The initiatior of this subscription */ private SessionInfo sessionInfo; /** reference to keyData */ private KeyData keyData; /** reference to 'Quality of Service' of subscribe() / unSubscribe() */ private SubscribeQosServer subscribeQos; /** The unique key of a subscription (subscriptionId), which is a function of f(keyData,xmlQos). <br /> This is the returned id of a subscribe() invocation */ private String uniqueKey; /** reference to my managing container */ private TopicHandler topicHandler; /** A reference to the query subscription (XPATH), which created this subscription If the subscription was EXACT, querySub is null */ private SubscriptionInfo querySub; /** It it is a query subscription, we remember all subscriptions which resulted from this query */ private ArrayList childrenList; /** If duplicateUpdates=false is set we can check here how often this message is subscribed from the same client */ private int subscribeCounter = 0; // is incr/decr by fireSubscribeEvent() and fireUnSubscribeEvent() private long creationTime = System.currentTimeMillis(); /** uniqueId used to store this in queue */ private long persistenceId = -1L; /** My JMX registration */ private JmxMBeanHandle mbeanHandle; private boolean isShutdown; /** * Use this constructor for an exact subscription. * @param sessionInfo The session which initiated this subscription * @param keyData The message meta info * @param qos This may be a SubscribeQosServer or a UnSubscribeQosServer instance */ public SubscriptionInfo(ServerScope glob, SessionInfo sessionInfo, KeyData keyData, SubscribeQosServer qos) throws XmlBlasterException { init(glob, sessionInfo, keyData, qos); } /** * Use this constructor it the subscription is a result of a XPath subscription * @param sessionInfo The session which initiated this subscription * @param querySub The XPATH query subscription which is has us as a child * @param keyData The matching key for the above querySub */ public SubscriptionInfo(ServerScope glob, SessionInfo sessionInfo, SubscriptionInfo querySub, KeyData keyData) throws XmlBlasterException { this.querySub = querySub; init(glob, sessionInfo, keyData, querySub.getSubscribeQosServer()); } private void init(ServerScope glob, SessionInfo sessionInfo, KeyData keyData, SubscribeQosServer qos) throws XmlBlasterException { this.glob = glob; this.sessionInfo = sessionInfo; this.keyData = keyData; this.subscribeQos = qos; if (this.sessionInfo == null) { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "No sessionInfo passed"+toXml()); } if (this.subscribeQos == null) { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "No subscribeQos passed"+toXml()); } AccessFilterQos[] filterQos = this.subscribeQos.getAccessFilterArr(); if (filterQos != null) { for (int ii=0; ii<filterQos.length; ii++) { this.glob.getRequestBroker().getAccessPluginManager().addAccessFilterPlugin( filterQos[ii].getType(), filterQos[ii].getVersion()); } } initSubscriptionId(); // initialize the unique id this.uniqueKey ME += "-" + this.uniqueKey ; // For JMX instanceName may not contain "," String instanceName = this.glob.validateJmxValue(this.uniqueKey); this.contextNode = new ContextNode(ContextNode.SUBSCRIPTION_MARKER_TAG, instanceName, this.glob.getContextNode()); this.mbeanHandle = this.glob.registerMBean(this.contextNode, this); if (log.isLoggable(Level.FINE)) log.fine("Created SubscriptionInfo '" + getSubscriptionId() + "' for client '" + sessionInfo.getSessionName().getRelativeName() + "' for topic '" + this.keyData.getOid() + "'"); } /** * If same client subscribes multiple times on same topic but wants * to suppress multi-updates (e.g. cluster node clients). */ public void incrSubscribeCounter() { subscribeCounter++; } /** * If same client subscribes multiple times on same topic but wants * to suppress multi-updates (e.g. cluster node clients). */ public void decrSubscribeCounter() { subscribeCounter--; } /** * If same client subscribes multiple times on same topic but wants * to suppress multi-updates (e.g. cluster node clients). */ public int getSubscribeCounter() { return subscribeCounter; } /** * The session info of the client who initiated this subscription * @return Never null, but the sessionInfo instance may be meanwhile shutdown */ public SessionInfo getSessionInfo() { return this.sessionInfo; } /** * My destination queue. */ public I_Queue getMsgQueue() { return getSessionInfo().getSessionQueue(); } /** * For this query subscription remember all resulted child subscriptions */ public synchronized void addSubscription(SubscriptionInfo subs) { if (this.childrenList == null) this.childrenList = new ArrayList(); this.childrenList.add(subs); } /** * For this query subscription remember all resulted subscriptions */ public synchronized void removeChildSubscription(SubscriptionInfo subs) { if (this.childrenList == null) return; boolean found = this.childrenList.remove(subs); if (!found) { log.severe("Failed to remove XPATH children subscription " + uniqueKey); Thread.dumpStack(); return; } if (log.isLoggable(Level.FINE)) log.fine("Removed XPATH " + uniqueKey + " children subscription "); // + subs.getSubscriptionId()); } /** * For this query subscription return all resulted subscriptions * @return null if not a query subscription with children */ public synchronized SubscriptionInfo[] getChildrenSubscriptions() { if (this.childrenList==null) return null; return (SubscriptionInfo[])this.childrenList.toArray(new SubscriptionInfo[this.childrenList.size()]); } public boolean isQuery() { return this.keyData.isQuery(); } /** * @return true if it is an exact subscription. Not a * query nor a domain subscriptions. */ public boolean isExact() { return !isQuery() && !isCreatedByQuerySubscription(); } /** * If true this is a child. It is automatically * generated by a query subscription. * @return */ public boolean isCreatedByQuerySubscription() { return querySub != null; } protected void finalize() { if (log.isLoggable(Level.FINE)) log.fine("finalize - garbage collect " + uniqueKey); } /** * @return Null if none configured */ public AccessFilterQos[] getAccessFilterArr() { if (this.subscribeQos == null) return null; return subscribeQos.getAccessFilterArr(); } /** * This must be called as soon as my TopicHandler handles me. * @param topicHandler I'm handled (lifetime) by this handler */ public void addTopicHandler(TopicHandler topicHandler) { if (topicHandler == null) { Thread.dumpStack(); log.severe("addTopicHandler with topicHandler==null seems to be strange"); } this.topicHandler = topicHandler; if (this.topicHandler != null) { if (log.isLoggable(Level.FINE)) log.fine("Assign to SubscriptionInfo '" + uniqueKey + "' for client '" + getSessionInfo().getId() + "' topic '" + this.topicHandler.getUniqueKey() + "'"); } } public TopicHandler getTopicHandler() { if (topicHandler == null) { Thread.dumpStack(); log.severe("addTopicHandler with topicHandler==null seems to be strange"); } return topicHandler; } /** * Time when this Subscription is invoked. * @return the creation time of this subscription (in millis) */ public long getCreationTime() { return this.creationTime; } /** * Telling my container that i am not subscribing any more. */ void removeSubscribe() { if (topicHandler == null) { if (!isQuery()) { log.warning("The id=" + uniqueKey + " has no TopicHandler which takes care of it: " + toXml()); Thread.dumpStack(); } return; } topicHandler.removeSubscriber(uniqueKey); shutdown(); } /** * @return The message wrapper object * @exception If no MsgUnitWrapper available public MsgUnitWrapper getMsgUnitWrapper() throws XmlBlasterException { if (topicHandler == null) { if (!isQuery()) { log.warn(ME, "Key oid=" + uniqueKey + " has no TopicHandler which takes care of it: " + toXml()); Thread.dumpStack(); } throw new XmlBlasterException(ME + ".NoMsgUnitWrapper", "Key oid=" + uniqueKey + " has no TopicHandler which takes care of it"); } return topicHandler.getMsgUnitWrapper(); } */ /** * Compare method needed for Interface Comparable. * * This determines the sorting order, by which the * client receive their updates. * For now, the client which subscribed first, is served first */ /*SORT_PROBLEM: Works fine with TreeSet, but with TreeMap i get the key here :-( public int compareTo(Object o) { SubscriptionInfo sub = (SubscriptionInfo)o; long diff = sub.getCreationTime() - getCreationTime(); if (diff < 0L) return -1; else if (diff > 0L) return +1; else return 0; } */ /** * Access on KeyData object * @return KeyData object */ public KeyData getKeyData() { return keyData; } /** * The oid of the message we belong to */ public String getKeyOid() { if (keyData != null) { return keyData.getOid(); } return null; } /** * Access on SubscribeQosServer object * @return SubscribeQosServer object or null */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -