⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cachequeueinterceptorplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/*------------------------------------------------------------------------------Name:      CacheQueueInterceptorPlugin.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.queue.cache;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queue.I_QueuePutListener;import org.xmlBlaster.util.queue.I_StoragePlugin;import org.xmlBlaster.util.queue.StorageSizeListenerHelper;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.def.Constants;import java.util.ArrayList;import org.xmlBlaster.util.queue.QueuePluginManager;// import org.xmlBlaster.util.queue.jdbc.I_ConnectionStateListener;// currently only for a dump ...import org.xmlBlaster.util.queue.ram.RamQueuePlugin;import org.xmlBlaster.util.queue.I_StorageProblemListener;import java.io.File;import java.io.FileOutputStream;import java.util.Properties;import java.io.OutputStream;/** * Implements a queue cache.  * Internally it utilizes a RAM queue and a JDBC queue and manages the cache logic.  * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/queue.cache.html">The queue.cache requirement</a> * @author michele@laghi.eu * @author xmlBlaster@marcelruff.info */public class CacheQueueInterceptorPlugin implements I_Queue, I_StoragePlugin, I_StorageProblemListener, CacheQueueInterceptorPluginMBean{   private static Logger log = Logger.getLogger(CacheQueueInterceptorPlugin.class.getName());   private String ME;   private ContextNode contextNode;   private QueuePropertyBase property;             // plugins via I_Queue   private boolean notifiedAboutAddOrRemove = false;   boolean isDown = true;   private StorageId queueId;   private I_QueuePutListener putListener;//   private java.util.Properties pluginProperties;  // plugins via I_Plugin   private I_Queue transientQueue;   private I_Queue persistentQueue;   private Global glob;   private boolean isConnected = false;   /** object used to control the swapping performance */   //private CacheControlParam controlParam;   private PluginInfo pluginInfo;   /** this is the sync between the peaks and the swapping: no peak should be allowed while swapping */   private Object peekSync = new Object();   /** My JMX registration */   private Object mbeanHandle;      private long maxFetchSize = Long.MAX_VALUE;   private StorageSizeListenerHelper storageSizeListenerHelper;      public CacheQueueInterceptorPlugin() {      this.storageSizeListenerHelper = new StorageSizeListenerHelper(this);   }      public boolean isTransient() {      return this.transientQueue.isTransient() && this.persistentQueue.isTransient();   }   /**    * Helper method to check the space left on a given queue.    * @param  queue the queue on which to calculate the space left.    * @param  valueToCheckAgainst the amount of bytes which are subtracted (needed in the queue) in this               check.    * @param  ifFullThrowException if 'true' this method will throw an exception if the return value would              be negative    * @return long the space left on the specified queue after having occupied the queue with what is     *         specified in 'valueToCheckAgainst'    * @throws XmlBlasterException if the 'ifFullThrowException' flag has been set to 'true' and the     *         return value would be negative.    */   private final long checkSpaceAvailable(I_Queue queue, long valueToCheckAgainst, boolean ifFullThrowException, String extraTxt)       throws XmlBlasterException {      long spaceLeft = queue.getMaxNumOfBytes() - queue.getNumOfBytes() - valueToCheckAgainst;      if (log.isLoggable(Level.FINE)) log.fine(ME+"maxNumOfBytes=" + queue.getMaxNumOfBytes() + "' numOfBytes='" + queue.getNumOfBytes() + "'. Occured at " + extraTxt);      if (spaceLeft < 0L && (log.isLoggable(Level.FINE) || ifFullThrowException)) {         String maxBytes = "maxBytes";         String queueName = "Cache";         if (queue == this.transientQueue) {            maxBytes = "maxBytesCache";            queueName = "Transient";         }         else if (queue == this.persistentQueue) {            queueName = "Persistent";         }         String reason = queueName + " queue overflow, " + queue.getNumOfBytes() +                         " bytes are in queue, try increasing '" +                          this.property.getPropName(maxBytes) + "' on client login: " + extraTxt;         if (log.isLoggable(Level.FINE)) log.fine(ME+ reason + this.toXml(""));         if (ifFullThrowException)            throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_BYTES, ME, reason);      }      return spaceLeft;   }   private final long checkEntriesAvailable(I_Queue queue, long valueToCheckAgainst, boolean ifFullThrowException, String extraTxt)       throws XmlBlasterException {      long entriesLeft = queue.getMaxNumOfEntries() - queue.getNumOfEntries() - valueToCheckAgainst;      if (entriesLeft < 0L && (log.isLoggable(Level.FINE) || ifFullThrowException)) {         String maxEntries = "maxEntries";         String queueName = "Cache";         if (queue == this.transientQueue) {            maxEntries = "maxEntriesCache";            queueName = "Transient";         }         else if (queue == this.persistentQueue) {            queueName = "Persistent";         }         String reason = queueName + " queue overflow, " + queue.getNumOfEntries() +                         " entries are in queue, try increasing '" +                          this.property.getPropName(maxEntries) + "' on client login: " + extraTxt;         if (log.isLoggable(Level.FINE)) log.fine(ME+ reason + this.toXml(""));         if (ifFullThrowException)            throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason);      }      return entriesLeft;   }   /**    * @see I_StorageProblemListener#storageUnavailable(int)    */   synchronized public void storageUnavailable(int oldStatus) {      if (log.isLoggable(Level.FINER))          log.finer(ME+"entering storageUnavailable");      this.isConnected = false;      // we could optimize this by providing a peekLast method to the I_Queue      try {         this.transientQueue.peek(-1, -1L);      }      catch (XmlBlasterException ex) {         log.severe(ME+"storageUnavailable: exception occured when peeking the transient queue: " + ex.getMessage());         ex.printStackTrace();      }   }   /**    * @see I_StorageProblemListener#storageAvailable(int)    */   synchronized public void storageAvailable(int oldStatus) {      if (oldStatus == I_StorageProblemListener.UNDEF) return;      if (log.isLoggable(Level.FINER)) log.finer(ME+"entering storageAvailable");     /* remove all obsolete messages from the persitence. */      if (this.persistentQueue == null) return; // should never happen      try {         boolean isInclusive = true; // if the reference is the original one then it is inclusive, if it is a new one then it is exclusive         I_QueueEntry limitEntry = null; // this.referenceEntry;         if (log.isLoggable(Level.FINE)) {            if (limitEntry == null) log.fine(ME+"The reference entry is null");            else log.fine(ME+"The reference entry is '" + limitEntry.getUniqueId() + "' and its flag 'stored' is '" + limitEntry.isStored() + "'");         }         ArrayList list = null;         if (limitEntry == null || limitEntry.isStored()) {            isInclusive = false;            limitEntry = this.transientQueue.peek(); // get the first entry in the RAM queue as ref            if (log.isLoggable(Level.FINE)) {               if (limitEntry == null) log.fine(ME+"The new reference entry is null");               else log.fine(ME+"The new reference entry is '" + limitEntry.getUniqueId() + "'");            }         }         if (limitEntry == null) { // then ram queue was empty when it lost connection and is empty now            isInclusive = false;            this.persistentQueue.clear();          }         // remove all old msg which are higher than the reference entry all more important msg were sent already                  else this.persistentQueue.removeWithLimitEntry(limitEntry, isInclusive);          limitEntry = this.persistentQueue.peek();         if (limitEntry != null) {            list = this.transientQueue.peekWithLimitEntry(limitEntry);            if (list.size() > 0) {               // TAKE AWAY ALL TRANSIENTS !!!!!!               long countToPut = this.persistentQueue.getMaxNumOfEntries() - this.persistentQueue.getNumOfEntries();               long bytesToPut = this.persistentQueue.getMaxNumOfBytes() - this.persistentQueue.getNumOfBytes();               long currBytes = 0L;               ArrayList list2 = new ArrayList();               for (int i=list.size()-1; i >= 0; i--) {                  I_Entry entry = (I_Entry)list.get(i);                  if (entry.isPersistent()) {                     if (currBytes >= bytesToPut || list2.size() >= countToPut) {                        break;                     }                     list2.add(entry);                     currBytes += entry.getSizeInBytes();                  }               }               if (list2.size() > 0) {                  this.persistentQueue.put((I_QueueEntry[])list2.toArray(new I_QueueEntry[list2.size()]), false);               }           }         }         this.isConnected = true;      }      catch (XmlBlasterException ex) {         log.severe(ME+"exception occured when reconnecting. " + ex.getMessage());         ex.printStackTrace();      }      finally {         try {            loadFromPersistence();         }         catch (XmlBlasterException ex) {            log.severe(ME+"storageAvailable: exception when loading from persistence: " + ex.getMessage());            ex.printStackTrace();         }      }   }   /**    * Is called after the instance is created.    * @param uniqueQueueId A unique name, allowing to create a unique name for a persistent store (e.g. file name)    * @see I_Queue#initialize(StorageId, Object)    */   synchronized public void initialize(StorageId uniqueQueueId, Object userData)      throws XmlBlasterException   {      if (this.isDown) {         java.util.Properties pluginProperties = null;         if (this.pluginInfo != null) pluginProperties = this.pluginInfo.getParameters();         if (pluginProperties == null)             pluginProperties = new java.util.Properties(); // if loaded from testsuite without a PluginManager         this.property = null;         this.glob = ((QueuePropertyBase)userData).getGlobal();         this.ME = uniqueQueueId.toString() + ": ";         if (log.isLoggable(Level.FINER)) log.finer(ME+"initialized");         this.queueId = uniqueQueueId;         // For JMX instanceName may not contain ","         String instanceName = this.glob.validateJmxValue(this.queueId.getId());         this.contextNode = new ContextNode(ContextNode.QUEUE_MARKER_TAG, instanceName,                              this.glob.getContextNode()); // TODO: pass from real parent like SubjectInfo         this.mbeanHandle = this.glob.registerMBean(this.contextNode, this);         QueuePluginManager pluginManager = glob.getQueuePluginManager();         QueuePropertyBase queuePropertyBase = (QueuePropertyBase)userData;         try {        	 this.maxFetchSize = Long.valueOf(pluginProperties.getProperty("maxFetchSize", ""+maxFetchSize)).longValue();         }         catch (Throwable e) {             log.warning(ME+"Setting maxFetchSize failed: " + e.toString());         }         //instantiate and initialize the underlying queues         String defaultTransient = pluginProperties.getProperty("transientQueue", "RAM,1.0").trim();         if (defaultTransient.startsWith(getType())) {            log.severe(ME+"Cache queue configured with transientQueue=CACHE, to prevent recursion we set it to 'RAM,1.0'");            defaultTransient = "RAM,1.0";         }         QueuePropertyBase ramProps = createRamCopy(queuePropertyBase);         ramProps.setEmbedded(true);         this.transientQueue = pluginManager.getPlugin(defaultTransient, uniqueQueueId, ramProps);         //log.error(ME, "Debug only: " + this.transientQueue.toXml(""));                  try {            String defaultPersistent = pluginProperties.getProperty("persistentQueue", "JDBC,1.0").trim();            if (defaultPersistent.startsWith(getType())) {               log.severe(ME+"Cache queue configured with persistentQueue=CACHE, to prevent recursion we set it to 'JDBC,1.0'");               defaultPersistent = "JDBC,1.0";            }            boolean oldEmbedded = queuePropertyBase.isEmbedded(); // since a CACHE could be inside a CACHE            queuePropertyBase.setEmbedded(true);            this.persistentQueue = pluginManager.getPlugin(defaultPersistent, uniqueQueueId, queuePropertyBase);            queuePropertyBase.setEmbedded(oldEmbedded); // since it is not a clone we make sure to reset it to its original            this.isConnected = true;            // to be notified about reconnections / disconnections//            this.glob.getJdbcQueueManager(this.queueId).registerListener(this);            this.persistentQueue.registerStorageProblemListener(this);         }         catch (XmlBlasterException ex) {            log.severe(ME+"Could not initialize the persistent queue '" + uniqueQueueId + "'. Is the JDBC Driver jar file in the CLASSPATH ?" +                " Is the DB up and running ? We continue RAM based ..." + ex.getMessage() +                " The propery settings are:" + queuePropertyBase.toXml());            // start a polling thread to see if the connection can be established later             ex.printStackTrace();         }         // do the queue specific stuff like delete all volatile entries in         // the persistent queue         if (isPersistenceAvailable()) {            try {               this.persistentQueue.removeTransient();            }            catch (XmlBlasterException ex) {               log.severe(ME+"could not remove transient entries (swapped entries) probably due to no connection to the DB, or the DB is down");               ex.printStackTrace();            }            setProperties(userData);            // not used yet            //this.controlParam = new CacheControlParam((QueuePropertyBase)getProperties());            loadFromPersistence();            // on restart the added() event is not triggered!         } // persistentQueue!=null         this.isDown = false;         if (log.isLoggable(Level.FINE)) log.fine(ME+"Successful initialized");      } // isDown?   }   /**    * We set the cache props to the real props for RAM queue running under a cacheQueue    */   private QueuePropertyBase createRamCopy(QueuePropertyBase queuePropertyBase) {      QueuePropertyBase ramCopy = (QueuePropertyBase)queuePropertyBase.clone();      ramCopy.setMaxEntries(queuePropertyBase.getMaxEntriesCache());      ramCopy.setMaxBytes(queuePropertyBase.getMaxBytesCache());      return ramCopy;   }   /**    * @see I_Queue#setProperties(Object)    */   synchronized public void setProperties(Object userData) throws XmlBlasterException {      if (userData == null) return;      QueuePropertyBase newProp;      try {         newProp = (QueuePropertyBase)userData;      }      catch(Throwable e) {         log.severe(ME+"Can't configure queue, your properties are invalid: " + e.toString());         e.printStackTrace();         return;      }      /* Do we need to protect against shrinking?      if (this.property != null && this.property.getMaxEntries() > newProp.getMaxEntries()) {         log.warn(ME, "Reconfigure of a RamQueuePlugin - getMaxNumOfEntries from " + this.property.getMaxEntries() +                    " to " + newProp.getMaxEntries() + " is not supported, we ignore the new setting.");         return;      }      */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -