📄 queuepropertybase.java
字号:
/*------------------------------------------------------------------------------Name: QueueProperty.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.qos.storage;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xml.sax.Attributes;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.property.PropString;import org.xmlBlaster.util.property.PropLong;import org.xmlBlaster.util.property.PropEntry;import org.xmlBlaster.util.property.PropBoolean;import org.xmlBlaster.util.qos.address.AddressBase;/** * Helper class holding callback queue properties. * <p /> * See ConnectQos for XML syntax. * @see org.xmlBlaster.client.qos.ConnectQos */public abstract class QueuePropertyBase implements Cloneable{ protected final Global glob; private static Logger log = Logger.getLogger(QueuePropertyBase.class.getName()); private String propertyPrefix = ""; /** The queue plugin type "CACHE" "RAM" "JDBC" or others */ public static final String DEFAULT_type = "CACHE"; protected PropString type = new PropString(DEFAULT_type); /** The queue plugin version "1.0" or similar */ public static final String DEFAULT_version = "1.0"; protected PropString version = new PropString(DEFAULT_version); /** The max setting allowed for queue maxEntriesCache is adjustable with property "queue.maxEntriesCache=1000" (1000 messages is default) */ public static final long DEFAULT_maxEntriesCacheDefault = 1000L; protected long maxEntriesCacheDefault = DEFAULT_maxEntriesCacheDefault; /** The max setting allowed for queue max size in bytes is adjustable with property "queue.maxBytes=4194304" (10 MBytes is default) */ public static final long DEFAULT_bytesDefault = 10485760L; // 10 MB (MsgUnitStore and client side queue increases it to Integer.MAX_VAULE) protected long maxBytesDefault = DEFAULT_bytesDefault; /** The max setting allowed for queue max size of cache in bytes is adjustable with property "queue.maxBytesCache=4000000" (4 MBytes is default) */ public static final long DEFAULT_bytesCacheDefault = 2097152L; // 2 MB protected long c = DEFAULT_bytesCacheDefault; /** The default settings (as a ratio relative to the maxBytesCache) for the storeSwapLevel */ public static final double DEFAULT_storeSwapLevelRatio = 0.70; /** The default settings (as a ratio relative to the maxBytesCache) for the storeSwapBytes */ public static final double DEFAULT_storeSwapBytesRatio = 0.25; /** The default settings (as a ratio relative to the maxBytesCache) for the storeSwapLevel */ public static final double DEFAULT_reloadSwapLevelRatio = 0.30; /** The default settings (as a ratio relative to the maxBytesCache) for the storeSwapBytes */ public static final double DEFAULT_reloadSwapBytesRatio = 0.25; /** The unique queue or storage name, e.g. "history" */ protected String relating = Constants.RELATING_CALLBACK; /** The max setting allowed for queue maxEntries is adjustable with property "queue.maxEntries=1000" (1000 messages is default) */ public long DEFAULT_maxEntries = 1000L; /** The max. capacity of the queue in number of entries */ protected PropLong maxEntries = new PropLong(DEFAULT_maxEntries); /** The max. capacity of the queue in Bytes */ protected PropLong maxBytes = new PropLong(maxBytesDefault); /** The max. capacity of the cache of the queue in number of entries */ protected PropLong maxEntriesCache = new PropLong(maxEntriesCacheDefault); /** The max. capacity of the queue in Bytes for the cache */ protected PropLong maxBytesCache = new PropLong(c); /** The settings for the storeSwapLevel */ protected long storeSwapLevel = (long)(DEFAULT_storeSwapLevelRatio*this.maxBytesCache.getValue()); /** The settings for the storeSwapBytes */ protected long storeSwapBytes = (long)(DEFAULT_storeSwapBytesRatio*this.maxBytesCache.getValue()); /** The settings for the reloadSwapLevel */ protected long reloadSwapLevel = (long)(DEFAULT_reloadSwapLevelRatio*this.maxBytesCache.getValue()); /** The settings for the storeSwapBytes */ protected long reloadSwapBytes = (long)(DEFAULT_reloadSwapBytesRatio*this.maxBytesCache.getValue()); /** Error handling when queue is full: Constants.ONOVERFLOW_DEADMESSAGE | Constants.ONOVERFLOW_DISCARDOLDEST */ public static final String DEFAULT_onOverflow = Constants.ONOVERFLOW_DEADMESSAGE; protected PropString onOverflow = new PropString(DEFAULT_onOverflow); /** Error handling when callback failed (after all retries etc.): Constants.ONOVERFLOW_DEADMESSAGE */ public static final String DEFAULT_onFailure = Constants.ONOVERFLOW_DEADMESSAGE; protected PropString onFailure = new PropString(DEFAULT_onFailure); public static AddressBase[] EMPTY_ADDRESS_ARR = new AddressBase[0]; /** The corresponding callback address, is set by derived classes */ protected AddressBase[] addressArr = EMPTY_ADDRESS_ARR; /** To allow specific configuration parameters for specific cluster nodes */ protected String nodeId = null; /** To allow debugging the queue (experimental) */ protected PropBoolean debug = new PropBoolean(false); private boolean embedded; /** * @param glob The global handle containing env informations * @param nodeId If not null, the command line properties will look for prop[nodeId] as well, * e.g. -queue/maxEntries and -queue/maxEntries[heron] will be searched<br /> * The nodeId should be stripped from special characters (see glob.getStrippedId()) * e.g. '/' or '[' is not allowed in the nodeId * @see Global#getStrippedId() */ public QueuePropertyBase(Global glob, String nodeId) { if (glob == null) { Thread.dumpStack(); this.glob = new Global(); } else this.glob = glob; this.nodeId = (nodeId == null) ? glob.getStrippedId() : nodeId; } /** * @return The prefix (relating='') for properties e.g. "history" -> "-queue/history/maxEntries" */ public String getPropertyPrefix() { return this.propertyPrefix; } /** * The command line prefix to configure the queue or msgUnitStore * @return e.g. "persistence/msgUnitStore/" or "queue/history/" */ public String getPrefix() { //@return e.g. "msgUnitStore." or "history.queue." //return (this.relating!=null&&this.relating.length() > 0) ? this.relating+"."+getRootTagName()+"." : getRootTagName()+"."; return getRootTagName() + PropEntry.SEP + ((this.relating!=null&&this.relating.length() > 0) ? (this.relating+PropEntry.SEP) : ""); } /** * Helper for logging output, creates the property key for configuration (the command line property). * @param prop e.g. "maxEntries" * @return e.g. "-queue/history/maxEntries" or "-queue/history/maxEntriesCache" */ public String getPropName(String token) { return "-" + getPrefix() + token; } /** * Configure property settings, add your own defaults in the derived class * @param relating e.g. "history" or "callback", similar to <queue related='history'/> or * <persistence related='msgUnitStore'/> etc. */ protected void initialize(String relating) { this.relating = (relating == null) ? "" : relating; String prefix = getPrefix(); String context = null; // something like "/topic/HelloWorld" // extract the plugin type and version from 'defaultPlugin' String propName = null; try { PropString defaultPlugin = new PropString(this.type.getDefaultValue()+","+this.version.getDefaultValue()); // Port to linked ContextNode? propName = defaultPlugin.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "defaultPlugin"); if (log.isLoggable(Level.FINE)) log.fine("Lookup of nodeId=" + nodeId + " context=" + context + " getRootTagName=" + getRootTagName() + " relating=" + relating + " propName=" + propName + " defaultValue=" + defaultPlugin.getValue()); PluginInfo pluginInfo = new PluginInfo(glob, null, defaultPlugin.getValue()); if (defaultPlugin.isModified()) { setType(pluginInfo.getType()); setVersion(pluginInfo.getVersion()); } else { this.type.setDefaultValue(pluginInfo.getType()); this.version.setDefaultValue(pluginInfo.getVersion()); } } catch (XmlBlasterException ex) { log.severe("initialize: could not set the default plugin to what indicated by "+propName); } // The newer way: this.maxEntries.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "maxEntries"); this.maxEntriesCache.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "maxEntriesCache"); this.maxBytes.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "maxBytes"); this.maxBytesCache.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "maxBytesCache"); this.type.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "type", false); this.version.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "version", false); this.onOverflow.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "onOverflow"); this.onFailure.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "onFailure"); this.debug.setFromEnv(this.glob, nodeId, context, getRootTagName(), relating, "debug"); // The old way: // prefix is e.g. "queue/callback" or "msgUnitStore." setStoreSwapLevel(glob.getProperty().get(prefix+"storeSwapLevel", (long)(DEFAULT_storeSwapLevelRatio*this.maxBytesCache.getValue()))); setStoreSwapBytes(glob.getProperty().get(prefix+"storeSwapBytes", (long)(DEFAULT_storeSwapBytesRatio*this.maxBytesCache.getValue()))); setReloadSwapLevel(glob.getProperty().get(prefix+"reloadSwapLevel", (long)(DEFAULT_reloadSwapLevelRatio*this.maxBytesCache.getValue()))); setReloadSwapBytes(glob.getProperty().get(prefix+"reloadSwapBytes", (long)(DEFAULT_reloadSwapBytesRatio*this.maxBytesCache.getValue()))); if (nodeId != null) { setStoreSwapLevel(glob.getProperty().get(prefix+"storeSwapLevel["+nodeId+"]", getStoreSwapLevel())); setStoreSwapBytes(glob.getProperty().get(prefix+"storeSwapBytes["+nodeId+"]", getStoreSwapBytes())); setReloadSwapLevel(glob.getProperty().get(prefix+"reloadSwapLevel["+nodeId+"]", getReloadSwapLevel())); setReloadSwapBytes(glob.getProperty().get(prefix+"reloadSwapBytes["+nodeId+"]", getReloadSwapBytes())); } checkConsistency(); if (log.isLoggable(Level.FINE)) log.fine("Initialized: " + toXml()); } /** * @param relating To what is this queue related: Constants.RELATING_CALLBACK | Constants.RELATING_SUBJECT | Constants.RELATING_CLIENT */ public void setRelating(String relating) { if (Constants.RELATING_CALLBACK.equalsIgnoreCase(relating)) this.relating = Constants.RELATING_CALLBACK; else if (Constants.RELATING_SUBJECT.equalsIgnoreCase(relating)) this.relating = Constants.RELATING_SUBJECT; else if (Constants.RELATING_CLIENT.equalsIgnoreCase(relating)) this.relating = Constants.RELATING_CLIENT; else if (Constants.RELATING_HISTORY.equalsIgnoreCase(relating)) this.relating = Constants.RELATING_HISTORY; else if (Constants.RELATING_MSGUNITSTORE.equalsIgnoreCase(relating)) this.relating = Constants.RELATING_MSGUNITSTORE; else if (Constants.RELATING_TOPICSTORE.equalsIgnoreCase(relating)) this.relating = Constants.RELATING_TOPICSTORE; else if (Constants.RELATING_SUBSCRIBE.equalsIgnoreCase(relating)) this.relating = Constants.RELATING_SUBSCRIBE; else if (Constants.RELATING_SESSION.equalsIgnoreCase(relating)) this.relating = Constants.RELATING_SESSION; else { log.warning("Ignoring relating='" + relating + "'"); Thread.dumpStack(); } } /** * Returns the queue id. * @return relating To what is this queue related: Constants.RELATING_CALLBACK | Constants.RELATING_SUBJECT */ public final String getRelating() { return this.relating; } /** * Max number of messages for this queue. * <br /> * @return number of messages */ public final long getMaxEntries() { return this.maxEntries.getValue(); } public final PropLong getMaxEntriesProp() { return this.maxEntries; } /** * Max number of messages for this queue. * <br /> * @param maxEntries */ public final void setMaxEntries(long maxEntries) { setMaxEntriesUnchecked(maxEntries); //checkConsistency(); } private final void setMaxEntriesUnchecked(long maxEntries) { this.maxEntries.setValue(maxEntries); } /**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -