📄 cachequeueinterceptorplugin.java
字号:
/*------------------------------------------------------------------------------Name: CacheQueueInterceptorPlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.queue.cache;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queue.I_QueuePutListener;import org.xmlBlaster.util.queue.I_StoragePlugin;import org.xmlBlaster.util.queue.StorageSizeListenerHelper;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.def.Constants;import java.util.ArrayList;import org.xmlBlaster.util.queue.QueuePluginManager;// import org.xmlBlaster.util.queue.jdbc.I_ConnectionStateListener;// currently only for a dump ...import org.xmlBlaster.util.queue.ram.RamQueuePlugin;import org.xmlBlaster.util.queue.I_StorageProblemListener;import java.io.File;import java.io.FileOutputStream;import java.util.Properties;import java.io.OutputStream;/** * Implements a queue cache. * Internally it utilizes a RAM queue and a JDBC queue and manages the cache logic. * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.cache.html">The queue.cache requirement</a> * @author michele@laghi.eu * @author xmlBlaster@marcelruff.info */public class CacheQueueInterceptorPlugin implements I_Queue, I_StoragePlugin, I_StorageProblemListener, CacheQueueInterceptorPluginMBean{ private static Logger log = Logger.getLogger(CacheQueueInterceptorPlugin.class.getName()); private String ME; private ContextNode contextNode; private QueuePropertyBase property; // plugins via I_Queue private boolean notifiedAboutAddOrRemove = false; boolean isDown = true; private StorageId queueId; private I_QueuePutListener putListener;// private java.util.Properties pluginProperties; // plugins via I_Plugin private I_Queue transientQueue; private I_Queue persistentQueue; private Global glob; private boolean isConnected = false; /** object used to control the swapping performance */ //private CacheControlParam controlParam; private PluginInfo pluginInfo; /** this is the sync between the peaks and the swapping: no peak should be allowed while swapping */ private Object peekSync = new Object(); /** My JMX registration */ private Object mbeanHandle; private long maxFetchSize = Long.MAX_VALUE; private StorageSizeListenerHelper storageSizeListenerHelper; public CacheQueueInterceptorPlugin() { this.storageSizeListenerHelper = new StorageSizeListenerHelper(this); } public boolean isTransient() { return this.transientQueue.isTransient() && this.persistentQueue.isTransient(); } /** * Helper method to check the space left on a given queue. * @param queue the queue on which to calculate the space left. * @param valueToCheckAgainst the amount of bytes which are subtracted (needed in the queue) in this check. * @param ifFullThrowException if 'true' this method will throw an exception if the return value would be negative * @return long the space left on the specified queue after having occupied the queue with what is * specified in 'valueToCheckAgainst' * @throws XmlBlasterException if the 'ifFullThrowException' flag has been set to 'true' and the * return value would be negative. */ private final long checkSpaceAvailable(I_Queue queue, long valueToCheckAgainst, boolean ifFullThrowException, String extraTxt) throws XmlBlasterException { long spaceLeft = queue.getMaxNumOfBytes() - queue.getNumOfBytes() - valueToCheckAgainst; if (log.isLoggable(Level.FINE)) log.fine(ME+"maxNumOfBytes=" + queue.getMaxNumOfBytes() + "' numOfBytes='" + queue.getNumOfBytes() + "'. Occured at " + extraTxt); if (spaceLeft < 0L && (log.isLoggable(Level.FINE) || ifFullThrowException)) { String maxBytes = "maxBytes"; String queueName = "Cache"; if (queue == this.transientQueue) { maxBytes = "maxBytesCache"; queueName = "Transient"; } else if (queue == this.persistentQueue) { queueName = "Persistent"; } String reason = queueName + " queue overflow, " + queue.getNumOfBytes() + " bytes are in queue, try increasing '" + this.property.getPropName(maxBytes) + "' on client login: " + extraTxt; if (log.isLoggable(Level.FINE)) log.fine(ME+ reason + this.toXml("")); if (ifFullThrowException) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_BYTES, ME, reason); } return spaceLeft; } private final long checkEntriesAvailable(I_Queue queue, long valueToCheckAgainst, boolean ifFullThrowException, String extraTxt) throws XmlBlasterException { long entriesLeft = queue.getMaxNumOfEntries() - queue.getNumOfEntries() - valueToCheckAgainst; if (entriesLeft < 0L && (log.isLoggable(Level.FINE) || ifFullThrowException)) { String maxEntries = "maxEntries"; String queueName = "Cache"; if (queue == this.transientQueue) { maxEntries = "maxEntriesCache"; queueName = "Transient"; } else if (queue == this.persistentQueue) { queueName = "Persistent"; } String reason = queueName + " queue overflow, " + queue.getNumOfEntries() + " entries are in queue, try increasing '" + this.property.getPropName(maxEntries) + "' on client login: " + extraTxt; if (log.isLoggable(Level.FINE)) log.fine(ME+ reason + this.toXml("")); if (ifFullThrowException) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason); } return entriesLeft; } /** * @see I_StorageProblemListener#storageUnavailable(int) */ synchronized public void storageUnavailable(int oldStatus) { if (log.isLoggable(Level.FINER)) log.finer(ME+"entering storageUnavailable"); this.isConnected = false; // we could optimize this by providing a peekLast method to the I_Queue try { this.transientQueue.peek(-1, -1L); } catch (XmlBlasterException ex) { log.severe(ME+"storageUnavailable: exception occured when peeking the transient queue: " + ex.getMessage()); ex.printStackTrace(); } } /** * @see I_StorageProblemListener#storageAvailable(int) */ synchronized public void storageAvailable(int oldStatus) { if (oldStatus == I_StorageProblemListener.UNDEF) return; if (log.isLoggable(Level.FINER)) log.finer(ME+"entering storageAvailable"); /* remove all obsolete messages from the persitence. */ if (this.persistentQueue == null) return; // should never happen try { boolean isInclusive = true; // if the reference is the original one then it is inclusive, if it is a new one then it is exclusive I_QueueEntry limitEntry = null; // this.referenceEntry; if (log.isLoggable(Level.FINE)) { if (limitEntry == null) log.fine(ME+"The reference entry is null"); else log.fine(ME+"The reference entry is '" + limitEntry.getUniqueId() + "' and its flag 'stored' is '" + limitEntry.isStored() + "'"); } ArrayList list = null; if (limitEntry == null || limitEntry.isStored()) { isInclusive = false; limitEntry = this.transientQueue.peek(); // get the first entry in the RAM queue as ref if (log.isLoggable(Level.FINE)) { if (limitEntry == null) log.fine(ME+"The new reference entry is null"); else log.fine(ME+"The new reference entry is '" + limitEntry.getUniqueId() + "'"); } } if (limitEntry == null) { // then ram queue was empty when it lost connection and is empty now isInclusive = false; this.persistentQueue.clear(); } // remove all old msg which are higher than the reference entry all more important msg were sent already else this.persistentQueue.removeWithLimitEntry(limitEntry, isInclusive); limitEntry = this.persistentQueue.peek(); if (limitEntry != null) { list = this.transientQueue.peekWithLimitEntry(limitEntry); if (list.size() > 0) { // TAKE AWAY ALL TRANSIENTS !!!!!! long countToPut = this.persistentQueue.getMaxNumOfEntries() - this.persistentQueue.getNumOfEntries(); long bytesToPut = this.persistentQueue.getMaxNumOfBytes() - this.persistentQueue.getNumOfBytes(); long currBytes = 0L; ArrayList list2 = new ArrayList(); for (int i=list.size()-1; i >= 0; i--) { I_Entry entry = (I_Entry)list.get(i); if (entry.isPersistent()) { if (currBytes >= bytesToPut || list2.size() >= countToPut) { break; } list2.add(entry); currBytes += entry.getSizeInBytes(); } } if (list2.size() > 0) { this.persistentQueue.put((I_QueueEntry[])list2.toArray(new I_QueueEntry[list2.size()]), false); } } } this.isConnected = true; } catch (XmlBlasterException ex) { log.severe(ME+"exception occured when reconnecting. " + ex.getMessage()); ex.printStackTrace(); } finally { try { loadFromPersistence(); } catch (XmlBlasterException ex) { log.severe(ME+"storageAvailable: exception when loading from persistence: " + ex.getMessage()); ex.printStackTrace(); } } } /** * Is called after the instance is created. * @param uniqueQueueId A unique name, allowing to create a unique name for a persistent store (e.g. file name) * @see I_Queue#initialize(StorageId, Object) */ synchronized public void initialize(StorageId uniqueQueueId, Object userData) throws XmlBlasterException { if (this.isDown) { java.util.Properties pluginProperties = null; if (this.pluginInfo != null) pluginProperties = this.pluginInfo.getParameters(); if (pluginProperties == null) pluginProperties = new java.util.Properties(); // if loaded from testsuite without a PluginManager this.property = null; this.glob = ((QueuePropertyBase)userData).getGlobal(); this.ME = uniqueQueueId.toString() + ": "; if (log.isLoggable(Level.FINER)) log.finer(ME+"initialized"); this.queueId = uniqueQueueId; // For JMX instanceName may not contain "," String instanceName = this.glob.validateJmxValue(this.queueId.getId()); this.contextNode = new ContextNode(ContextNode.QUEUE_MARKER_TAG, instanceName, this.glob.getContextNode()); // TODO: pass from real parent like SubjectInfo this.mbeanHandle = this.glob.registerMBean(this.contextNode, this); QueuePluginManager pluginManager = glob.getQueuePluginManager(); QueuePropertyBase queuePropertyBase = (QueuePropertyBase)userData; try { this.maxFetchSize = Long.valueOf(pluginProperties.getProperty("maxFetchSize", ""+maxFetchSize)).longValue(); } catch (Throwable e) { log.warning(ME+"Setting maxFetchSize failed: " + e.toString()); } //instantiate and initialize the underlying queues String defaultTransient = pluginProperties.getProperty("transientQueue", "RAM,1.0").trim(); if (defaultTransient.startsWith(getType())) { log.severe(ME+"Cache queue configured with transientQueue=CACHE, to prevent recursion we set it to 'RAM,1.0'"); defaultTransient = "RAM,1.0"; } QueuePropertyBase ramProps = createRamCopy(queuePropertyBase); ramProps.setEmbedded(true); this.transientQueue = pluginManager.getPlugin(defaultTransient, uniqueQueueId, ramProps); //log.error(ME, "Debug only: " + this.transientQueue.toXml("")); try { String defaultPersistent = pluginProperties.getProperty("persistentQueue", "JDBC,1.0").trim(); if (defaultPersistent.startsWith(getType())) { log.severe(ME+"Cache queue configured with persistentQueue=CACHE, to prevent recursion we set it to 'JDBC,1.0'"); defaultPersistent = "JDBC,1.0"; } boolean oldEmbedded = queuePropertyBase.isEmbedded(); // since a CACHE could be inside a CACHE queuePropertyBase.setEmbedded(true); this.persistentQueue = pluginManager.getPlugin(defaultPersistent, uniqueQueueId, queuePropertyBase); queuePropertyBase.setEmbedded(oldEmbedded); // since it is not a clone we make sure to reset it to its original this.isConnected = true; // to be notified about reconnections / disconnections// this.glob.getJdbcQueueManager(this.queueId).registerListener(this); this.persistentQueue.registerStorageProblemListener(this); } catch (XmlBlasterException ex) { log.severe(ME+"Could not initialize the persistent queue '" + uniqueQueueId + "'. Is the JDBC Driver jar file in the CLASSPATH ?" + " Is the DB up and running ? We continue RAM based ..." + ex.getMessage() + " The propery settings are:" + queuePropertyBase.toXml()); // start a polling thread to see if the connection can be established later ex.printStackTrace(); } // do the queue specific stuff like delete all volatile entries in // the persistent queue if (isPersistenceAvailable()) { try { this.persistentQueue.removeTransient(); } catch (XmlBlasterException ex) { log.severe(ME+"could not remove transient entries (swapped entries) probably due to no connection to the DB, or the DB is down"); ex.printStackTrace(); } setProperties(userData); // not used yet //this.controlParam = new CacheControlParam((QueuePropertyBase)getProperties()); loadFromPersistence(); // on restart the added() event is not triggered! } // persistentQueue!=null this.isDown = false; if (log.isLoggable(Level.FINE)) log.fine(ME+"Successful initialized"); } // isDown? } /** * We set the cache props to the real props for RAM queue running under a cacheQueue */ private QueuePropertyBase createRamCopy(QueuePropertyBase queuePropertyBase) { QueuePropertyBase ramCopy = (QueuePropertyBase)queuePropertyBase.clone(); ramCopy.setMaxEntries(queuePropertyBase.getMaxEntriesCache()); ramCopy.setMaxBytes(queuePropertyBase.getMaxBytesCache()); return ramCopy; } /** * @see I_Queue#setProperties(Object) */ synchronized public void setProperties(Object userData) throws XmlBlasterException { if (userData == null) return; QueuePropertyBase newProp; try { newProp = (QueuePropertyBase)userData; } catch(Throwable e) { log.severe(ME+"Can't configure queue, your properties are invalid: " + e.toString()); e.printStackTrace(); return; } /* Do we need to protect against shrinking? if (this.property != null && this.property.getMaxEntries() > newProp.getMaxEntries()) { log.warn(ME, "Reconfigure of a RamQueuePlugin - getMaxNumOfEntries from " + this.property.getMaxEntries() + " to " + newProp.getMaxEntries() + " is not supported, we ignore the new setting."); return; } */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -