📄 subscriptioninfo.java
字号:
public QueryQosData getQueryQosData() { if (this.subscribeQos == null) return null; return this.subscribeQos.getData(); } /** * @return null if none found */ public Map getQueryQosDataClientProperties() { QueryQosData queryQosData = getQueryQosData(); if (queryQosData != null) return queryQosData.getClientProperties(); return null; } /** * Supports limited reconfiguration with the given newQos. * @param newQos The new QueryQosData to use */ public void update(SubscribeQosServer newQos) { if (this.subscribeQos == null) { this.subscribeQos = newQos; if (log.isLoggable(Level.FINE)) log.fine("Updated SubscribeQos for " + getId()); } else { AccessFilterQos[] arr = newQos.getAccessFilterArr(); if (log.isLoggable(Level.FINE)) log.fine("Updated SubscribeQos AccessFilterArr for " + getId()); this.subscribeQos.getData().setFilters(arr); } /* QuerySpecQos[] qarr = subscribeQos.getQuerySpecArr(); if (qarr != null) { ... } */ } /** * @return Can be null */ public SubscribeQosServer getSubscribeQosServer() { return this.subscribeQos; } /** * Accessing a unique subscription id generated for this SubscriptionInfo. * <p /> * The key will be generated on first invocation * @return A unique key for this particular subscription */ public String getSubscriptionId() { if (this.uniqueKey == null) throw new IllegalArgumentException(ME+".getSubscriptionId() is not initialized"); return this.uniqueKey; } /** * For JMX the uniqueKey may not contain commas ','. */ private void initSubscriptionId() throws XmlBlasterException { if (this.uniqueKey == null) { if (this.querySub != null) { StringBuffer buf = new StringBuffer(126); Timestamp tt = new Timestamp(); // Using prefix of my parent XPATH subscription object: buf.append(this.querySub.getSubscriptionId()).append(":").append(String.valueOf(tt.getTimestamp())); this.uniqueKey = buf.toString(); if (log.isLoggable(Level.FINE)) log.fine("Generated child subscription ID=" + this.uniqueKey); } else { this.uniqueKey = SubscriptionInfo.generateUniqueKey(keyData, this.subscribeQos.getData(), this.glob.useCluster()); if (log.isLoggable(Level.FINE)) log.fine("Generated subscription ID=" + this.uniqueKey); } } } /** * Accessing the unique subscription id from method subscribe(), which was the reason for this SubscriptionInfo * @return The subscription id which is used in updateQos - $lt;subscritpionId> */ public String getSubSourceSubscriptionId() throws XmlBlasterException { if (querySub != null) { return querySub.getSubscriptionId(); } return getSubscriptionId(); } /** * Cleanup subscription. */ public void shutdown() { if (this.isShutdown) return; synchronized (this) { // to prevent two threads calling unregisterMBean() if (this.isShutdown) return; this.isShutdown = true; } this.glob.unregisterMBean(this.mbeanHandle); if (this.querySub != null) { this.querySub.removeChildSubscription(this); } //this.subscribeQos = null; Not setting to null because of multi thread access // Keep keyData for further processing // Keep uniqueKey for further processing } public boolean isShutdown() { return this.isShutdown; } /** * Test if this id is a subscribeId (starts with "__subId:") */ static boolean isSubscribeId(String id) { if (id == null) return false; return id.startsWith(Constants.SUBSCRIPTIONID_PREFIX) ? true : false; } /** * This static method may be used from external objects to get the unique key * of a subscription. * <p /> * For JMX the uniqueKey may not contain commas ','. * * @param clusterWideUnique If false the key is unique for this xmlBlaster instance only * @return A unique key for this particular subscription, for example:<br> * <code>__subId:heron-53</code> * @see org.xmlBlaster.util.qos.QueryQosData#generateSubscriptionId(String) */ private static String generateUniqueKey(KeyData keyData, QueryQosData xmlQos, boolean clusterWideUnique) throws XmlBlasterException { if (xmlQos.getSubscriptionId() != null && xmlQos.getSubscriptionId().length() > 0) { return xmlQos.getSubscriptionId(); // Client forced his own key } StringBuffer buf = new StringBuffer(126); buf.append(Constants.SUBSCRIPTIONID_PREFIX); if (clusterWideUnique) { // needs to be accepted by other cluster nodes buf.append(keyData.getGlobal().getNodeId().getId()).append("-"); } if (keyData.isQuery()) buf.append(keyData.getQueryType()); Timestamp tt = new Timestamp(); buf.append(String.valueOf(tt.getTimestamp())); return buf.toString(); } /** * * @param sessionName * @param xmlKey * @param subscribeQos * @throws XmlBlasterException * @see org.xmlBlaster.util.qos.QueryQosData#generateSubscriptionId(String) * @see generateUniqueKey */ public static void verifySubscriptionId(boolean isClusterNode, SessionName sessionName, QueryKeyData xmlKey, SubscribeQosServer subscribeQos) throws XmlBlasterException { if (subscribeQos.isRecoveredFromPersistenceStore()) return; String subscriptionId = subscribeQos.getSubscriptionId(); if (subscriptionId != null) { boolean isOk = true; //"__subId:client/joe/session/1-[your-unqiue-postfix]" if (!subscriptionId.startsWith(Constants.SUBSCRIPTIONID_PREFIX) || subscriptionId.length() < (Constants.SUBSCRIPTIONID_PREFIX.length()+5)) isOk = false; String tail = subscriptionId.substring(Constants.SUBSCRIPTIONID_PREFIX.length()); // "__subId:client/joe/session/1-XPATH://key" if (!tail.startsWith(sessionName.getRelativeName(true)) && // It could by a slave of a slave cluster node, so the check sessionName.getLoginName() is not enough !isClusterNode && //"__subId:heron-3456646466" for cluster slaves /*connectQos.isClusterNode()) &&*/ !tail.startsWith(sessionName.getLoginName()+"-")) isOk = false; if (!isOk) throw new XmlBlasterException(subscribeQos.getGlobal(), ErrorCode.USER_SUBSCRIBE_ID, "Your subscriptionId '" + subscriptionId + "' is invalid, we expect something like '" + subscribeQos.getData().generateSubscriptionId(sessionName, xmlKey)); } } /** * Dump state of this object into XML. * <br> * @return XML state of SubscriptionInfo */ public String toXml() { return toXml((String)null); } /** * Dump state of this object into XML. * <br> * @param extraOffset indenting of tags * @return XML state of SubscriptionInfo */ public String toXml(String extraOffset) { StringBuffer sb = new StringBuffer(2048); if (extraOffset == null) extraOffset = ""; String offset = Constants.OFFSET + extraOffset; sb.append(offset).append("<subscription id='").append(getSubscriptionId()).append("'"); sb.append(" sessionName='").append(getSessionInfo().getSessionName()).append("'"); if (this.topicHandler != null) { sb.append(" oid='").append(topicHandler.getUniqueKey()).append("'"); } if (this.querySub != null) { sb.append(" parent='").append(this.querySub.getSubscriptionId()).append("'"); } SubscriptionInfo[] childrenSubs = getChildrenSubscriptions(); if (childrenSubs != null) { sb.append(" numChilds='").append(childrenSubs.length).append("'"); } sb.append(" creationTime='" + IsoDateParser.getUTCTimestamp(this.creationTime) + "'"); sb.append(">"); //sb.append(offset).append("<SubscriptionInfo id='").append(getSubscriptionId()).append("'>"); //sb.append(offset + " <keyData oid='" + (keyData==null ? "null" : keyData.getUniqueKey()) + "'/>"); if (keyData != null) sb.append(keyData.toXml(extraOffset+Constants.INDENT)); if (this.subscribeQos != null) sb.append(this.subscribeQos.toXml(extraOffset+Constants.INDENT)); else sb.append(extraOffset+Constants.INDENT).append("<!-- subscribe qos is null ERROR -->"); //sb.append(offset).append(" <topicHandler id='").append((topicHandler==null ? "null" : topicHandler.getUniqueKey())).append("'/>"); //sb.append(offset).append(" <creationTime>").append(IsoDate...(this.creationTime)).append("</creationTime>"); if (childrenSubs != null) { for (int ii=0; ii<childrenSubs.length; ii++) { sb.append(offset).append(" <child>").append(childrenSubs[ii].getSubscriptionId()).append("</child>"); } } sb.append(offset).append("</subscription>"); return sb.toString(); } /** * Dump state of this object into XML. * <pre> * <subscribe id='_subId:1' sessionName='/node/heron/client/joe/-2' oid='HelloWorld' parent='_sub:XPATH-2'/> * </pre> * @param extraOffset indenting of tags * @return XML state of SubscriptionInfo */ public String toXmlSmall(String extraOffset) { StringBuffer sb = new StringBuffer(256); if (extraOffset == null) extraOffset = ""; String offset = Constants.OFFSET + extraOffset; sb.append(offset).append(" <subscription id='").append(getSubscriptionId()).append("'"); sb.append(" sessionName='").append(getSessionInfo().getSessionName()).append("'"); if (this.topicHandler != null) { sb.append(" oid='").append(topicHandler.getUniqueKey()).append("'"); } if (this.querySub != null) { sb.append(" parent='").append(this.querySub.getSubscriptionId()).append("'"); } if (this.childrenList != null) { synchronized (this) { sb.append(" numChilds='").append(this.childrenList.size()).append("'"); } } sb.append(" creationTime='" + IsoDateParser.getUTCTimestamp(this.creationTime) + "'"); sb.append("/>"); return sb.toString(); } /** * Gets the uniqueId for the persistence of this session. * @return the uniqueId used to identify this session as an entry * in the queue where it is stored (for persistent subscriptions). * If the session is not persistent it returns -1L. * */ public final long getPersistenceId() { return this.persistenceId; } /** * Sets the uniqueId used to retrieve this session from the persistence * @param persistenceId */ public final void setPersistenceId(long persistenceId) { this.persistenceId = persistenceId; } //++++++++++ Enforced by I_AdminSubscription ++++++++++++++++ public String getId() { return getSubscriptionId(); } public String getSessionName() { return getSessionInfo().getId(); } public String getTopicId() { if (this.topicHandler == null) return (getKeyOid()==null)?"":getKeyOid(); return this.topicHandler.getId(); } public String getParentSubscription() { if (this.querySub == null) return ""; return this.querySub.getSubscriptionId(); } public String getCreationTimestamp() { return IsoDateParser.getUTCTimestamp(this.creationTime); } public String getSubscribeQosStr() { return (this.subscribeQos==null) ? "" : this.subscribeQos.toXml(); } public String getSubscribeKeyStr() { return (this.keyData==null) ? "" : this.keyData.toXml(); } public String[] getAccessFilters() { if (this.subscribeQos == null) return new String[0]; AccessFilterQos[] arr = this.subscribeQos.getAccessFilterArr(); if (arr == null) return new String[0]; String[] ret = new String[arr.length]; for (int i=0; i<arr.length; i++) { ret[i] = arr[i].toXml(); } return ret; } public synchronized String[] getDependingSubscriptions() { if (this.childrenList==null || this.childrenList.size() < 1) return new String[0]; String[] ret = new String[this.childrenList.size()]; for (int i=0; i<this.childrenList.size(); i++) { SubscriptionInfo info = (SubscriptionInfo)this.childrenList.get(i); ret[i] = info.toXml(); } return ret; } /** JMX */ public java.lang.String usage() { return ServerScope.getJmxUsageLinkInfo(this.getClass().getName(), null); } /** JMX */ public java.lang.String getUsageUrl() { return ServerScope.getJavadocUrl(this.getClass().getName(), null); } /* JMX dummy to have a copy/paste functionality in jconsole */ public void setUsageUrl(java.lang.String url) {}}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -