📄 msgqossaxfactory.java
字号:
/*------------------------------------------------------------------------------Name: MsgQosSaxFactory.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.qos;import java.io.FileOutputStream;import java.util.logging.Logger;import org.xmlBlaster.util.FileLocator;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.RcvTimestamp;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.cluster.NodeId;import org.xmlBlaster.util.cluster.RouteInfo;import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;import org.xmlBlaster.util.qos.storage.MsgUnitStoreProperty;import java.util.ArrayList;import java.util.Properties;import org.xml.sax.*;/** * Parsing xml QoS (quality of service) of publish() and update(). * <p /> * Example for Pub/Sub style:<p /> * <pre> * <qos> < * <state id='OK' info='Keep on running"/> <!-- Only for updates and PtP --> * <priority>5</priority> * <administrative>false</administrative> * <expiration lifeTime='129595811' forceDestroy='false'/> <!-- Only for persistence layer --> * <persistent/> * <topic readonly='false' destroyDelay='60000' createDomEntry='true'> * <persistence relating='msgUnitStore' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='deadMessage'/> * <queue relating='history' type='CACHE' version='1.0' maxEntries='1000' maxBytes='4000000' onOverflow='exception'/> * </topic> * </qos> * </pre> * Example for PtP addressing style:<p /> * <pre> * <qos> * <subscribable>false</subscribable> <!-- false to make PtP message invisible for subscribes --> * <destination queryType='EXACT' forceQueuing='true'> * Tim * </destination> * <destination queryType='EXACT'> * /node/heron/client/Ben/-2 * </destination> * <destination queryType='XPATH'> <!-- Not supported yet --> * //[GROUP='Manager'] * </destination> * <destination queryType='XPATH'> <!-- Not supported yet --> * //ROLE/[@id='Developer'] * </destination> * <sender> * Gesa * </sender> * <priority>7</priority> * <route> * <node id='bilbo' stratum='2' timestamp='34460239640' dirtyRead='true'/> * </route> * </qos> * </pre> * <p /> * Example for update() QoS:<p /> * <pre> * <qos> < * <state id='OK' info='Keep on running"/> <!-- Only for updates and PtP --> * <sender>Tim</sender> * <priority>5</priority> * <subscribe id='__subId:1'/> <!-- Only for updates, PtP message are marked with id='__subId:PtP' --> * <rcvTimestamp nanos='1007764305862000002'> <!-- UTC time when message was created in xmlBlaster server with a publish() call, in nanoseconds since 1970 --> * 2001-12-07 23:31:45.862000002 <!-- The nanos from above but human readable --> * </rcvTimestamp> * <expiration lifeTime='129595811' forceDestroy='false'/> <!-- Only for persistence layer --> * <queue index='0' of='1'/> <!-- If queued messages are flushed on login --> * <persistent/> * <redeliver>4</redeliver> <!-- Only for updates --> * <route> * <node id='heron'/> * </route> * </qos> * </pre> * <p> * Note that receiveTimestamp is in nanoseconds, whereas all other time values are milliseconds * </p> * The receive timestamp can be delivered in human readable form as well * by setting on server command line: * <pre> * -cb.receiveTimestampHumanReadable true * * <rcvTimestamp nanos='1015959656372000000'> * 2002-03-12 20:00:56.372 * </rcvTimestamp> * </pre> * @see org.xmlBlaster.test.classtest.qos.MsgQosFactoryTest * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/interface.publish.html">The interface.publish requirement</a> * @author xmlBlaster@marcelruff.info */public class MsgQosSaxFactory extends org.xmlBlaster.util.XmlQoSBase implements I_MsgQosFactory{ private final Global glob; private static Logger log = Logger.getLogger(MsgQosSaxFactory.class.getName()); private MsgQosData msgQosData; /** helper flag for SAX parsing: parsing inside <state> ? */ //private boolean inState; //private boolean inSubscribe; //private boolean inRedeliver; private boolean inTopic; //private boolean inQueue; //private boolean inPersistence; //private boolean inSubscribable; private boolean inDestination; //private boolean inSender; //private boolean inPriority; //private boolean inExpiration; //private boolean inRcvTimestamp; //private boolean inIsVolatile; //private boolean inAdministrative; //private boolean inIsPersistent; //private boolean inForceUpdate; //private boolean inReadonly; private boolean inRoute; private Destination destination; private RouteInfo routeInfo; private boolean sendRemainingLife = true; /** * Can be used as singleton. */ public MsgQosSaxFactory(Global glob) { super(glob); this.glob = glob; } /** * Parses the given xml Qos and returns a MsgQosData holding the data. * Parsing of update() and publish() QoS is supported here. * @param the XML based ASCII string */ public synchronized MsgQosData readObject(String xmlQos) throws XmlBlasterException { if (xmlQos == null) { xmlQos = "<qos/>"; } //this.inState = false; //this.inSubscribe = false; //this.inRedeliver = false; this.inTopic = false; //this.inQueue = false; //this.inPersistence = false; //this.inSubscribable = false; this.inDestination = false; //this.inSender = false; //this.inPriority = false; //this.inExpiration = false; //this.inRcvTimestamp = false; //this.inIsVolatile = false; //this.inAdministrative = false; //this.inIsPersistent = false; //this.inForceUpdate = false; //this.inReadonly = false; this.inRoute = false; this.destination = null; this.routeInfo = null; this.sendRemainingLife = true; this.msgQosData = new MsgQosData(glob, this, xmlQos, MethodName.UNKNOWN); if (!isEmpty(xmlQos)) // if possible avoid expensive SAX parsing init(xmlQos); // use SAX parser to parse it (is slow) return msgQosData; } /** * Start element, event from SAX parser. * <p /> * @param name Tag name * @param attrs the attributes of the tag */ public final void startElement(String uri, String localName, String name, Attributes attrs) { if (super.startElementBase(uri, localName, name, attrs) == true) return; if (name.equalsIgnoreCase("state")) { if (!inQos) return; //this.inState = true; if (attrs != null) { int len = attrs.getLength(); for (int i = 0; i < len; i++) { if( attrs.getQName(i).equalsIgnoreCase("id") ) { msgQosData.setState(attrs.getValue(i).trim()); } else if( attrs.getQName(i).equalsIgnoreCase("info") ) { msgQosData.setStateInfo(attrs.getValue(i).trim()); } } // if (log.isLoggable(Level.FINE)) log.trace(ME, "Found state tag"); } return; } if (name.equalsIgnoreCase("subscribable")) { if (!inQos) return;// this.inSubscribable = true; if (attrs != null) { int len = attrs.getLength(); for (int i = 0; i < len; i++) { log.warning("Ignoring sent <subscribable> tag " + attrs.getQName(i) + "=" + attrs.getValue(i).trim()); } // if (log.isLoggable(Level.FINE)) log.trace(ME, "Found subscribable tag"); } msgQosData.setSubscribable(true); return; } if (name.equalsIgnoreCase("destination")) { if (!inQos) return; inDestination = true; this.destination = new Destination(); if (attrs != null) { int len = attrs.getLength(); for (int i = 0; i < len; i++) { if( attrs.getQName(i).equalsIgnoreCase("queryType") ) { String queryType = attrs.getValue(i).trim(); if (queryType.equalsIgnoreCase("EXACT")) { this.destination.setQueryType(queryType); } else if (queryType.equalsIgnoreCase("XPATH")) { this.destination.setQueryType(queryType); } else log.severe("Sorry, destination queryType='" + queryType + "' is not supported"); } else if( attrs.getQName(i).equalsIgnoreCase("forceQueuing") ) { String tmp = attrs.getValue(i).trim(); if (tmp.length() > 0) { this.destination.forceQueuing(new Boolean(tmp).booleanValue()); } } } } String tmp = character.toString().trim(); // The address or XPath query string if (tmp.length() > 0) { this.destination.setDestination(new SessionName(glob, tmp)); // set address or XPath query string if it is before inner tags character.setLength(0); } return; } if (name.equalsIgnoreCase("sender")) { if (!inQos) return;// this.inSender = true; if (attrs != null) { int len = attrs.getLength(); for (int i = 0; i < len; i++) { log.warning("Ignoring sent <sender> attribute " + attrs.getQName(i) + "=" + attrs.getValue(i).trim()); } // if (log.isLoggable(Level.FINE)) log.trace(ME, "Found sender tag"); } return; } if (name.equalsIgnoreCase("priority")) { if (!inQos) return;// this.inPriority = true; if (attrs != null) { int len = attrs.getLength(); for (int i = 0; i < len; i++) { log.warning("Ignoring sent <priority> attribute " + attrs.getQName(i) + "=" + attrs.getValue(i).trim()); } // if (log.isLoggable(Level.FINE)) log.trace(ME, "Found priority tag"); } return; } if (name.equalsIgnoreCase("expiration")) { if (!inQos) return;// this.inExpiration = true; if (attrs != null) { String tmp = attrs.getValue("lifeTime"); if (tmp != null) { try { msgQosData.setLifeTime(Long.parseLong(tmp.trim())); } catch(NumberFormatException e) { log.severe("Invalid lifeTime - millis =" + tmp); }; } else { log.warning("QoS <expiration> misses lifeTime attribute, setting default of " + MsgQosData.getMaxLifeTime()); msgQosData.setLifeTime(MsgQosData.getMaxLifeTime()); } tmp = attrs.getValue("forceDestroy"); if (tmp != null) { msgQosData.setForceDestroy(new Boolean(tmp.trim()).booleanValue()); } tmp = attrs.getValue("remainingLife"); if (tmp != null) { try { msgQosData.setRemainingLifeStatic(Long.parseLong(tmp.trim())); } catch(NumberFormatException e) { log.severe("Invalid remainingLife - millis =" + tmp); }; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -