⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 persistencecacheplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/*------------------------------------------------------------------------------Name:      PersistenceCachePlugin.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine.msgstore.cache;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.admin.I_AdminMap;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_StoragePlugin;import org.xmlBlaster.util.queue.StorageSizeListenerHelper;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.storage.MsgUnitStoreProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.msgstore.I_ChangeCallback;import org.xmlBlaster.engine.msgstore.StoragePluginManager;import org.xmlBlaster.util.queue.I_StorageProblemListener;import java.io.File;import java.io.FileOutputStream;import java.io.OutputStream;import java.util.ArrayList;import java.util.Properties;/** * Implements a random access message storage. * <p/> * The implementation uses internally a RAM and a JDBC map and handles the caching between those two. * @author michele@laghi.eu * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.persistence.html">The engine.persistence requirement</a> * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> * @see org.xmlBlaster.test.classtest.msgstore.I_MapTest  */public class PersistenceCachePlugin implements I_StoragePlugin, I_StorageProblemListener, I_Map, PersistenceCachePluginMBean{   private String ME;   private ContextNode contextNode;   private ServerScope glob;   private static Logger log = Logger.getLogger(PersistenceCachePlugin.class.getName());//   private java.util.Properties pluginProperties; // properties via I_Plugin   private QueuePropertyBase property;            // properties via I_Map   boolean isDown = true;   private StorageId storageId;   private I_Map transientStore;   private I_Map persistentStore;   private boolean isConnected = false;   private PluginInfo pluginInfo = null;   /** My JMX registration */   private Object mbeanHandle;   private StorageSizeListenerHelper storageSizeListenerHelper;   public PersistenceCachePlugin() {      this.storageSizeListenerHelper = new StorageSizeListenerHelper(this);   }      /*    * this boolean is set only under the time a recovery after having reconnected    * to the DB. It is used to limit the synchronization   private boolean storeNewPersistentRecovery = false;    */      /**    * Triggered by persistent store (JDBC) on lost connection    * @see org.xmlBlaster.util.queue.jdbc.I_ConnectionListener#disconnected()    */   public void storageUnavailable(int oldStatus) {      log.finer("storageUnavailable");      this.isConnected = false;   }   public boolean isTransient() {      return this.transientStore.isTransient() && (this.persistentStore == null || this.persistentStore.isTransient());   }   /**    * Triggered by persistent store (JDBC) on reconnection    * @see I_StorageProblemListener#storageAvailable(int)    */   public void storageAvailable(int oldStatus) {      if (oldStatus == I_StorageProblemListener.UNDEF) return;      log.finer("storageAvailable");     /* remove all obsolete entries from the persistence. Obsolete are the      * entries which are lower (lower priority and older) than the lowest      * entry in the transient storage.      */      if (this.persistentStore == null) return; // should never happen      //try {         log.warning("Persistent store has reconnected, we may have a memory leak as send messsages are not cleaned up. Current persistent messages are handled transient only, new ones will be handled persistent");         /*         // TODO: Implement an arraylist to remember the sent messages and destroy them         // Happens for persistent messages and swapped messages (if JDBC connection lost)         // For swapped entries the callback thread could block (poll) until the swap is available again.         synchronized(this.deleteDeliveredMonitor) {            I_MapEntry limitEntry = this.transientStore.peek();            ArrayList list = this.persistentStore.peekWithLimitEntry(limitEntry);            this.persistentStore.removeRandom((I_Entry[])list.toArray(new I_Entry[list.size()]));         }         */         /*         log.warn(ME, "Persistent store has reconnected, current persistent messages are handled transient only, new ones will be handled persistent");         // add all new persistent entries to the persistent storage ...         this.storeNewPersistentRecovery = true;         synchronized(this.storeNewPersistentRecoveryMonitor) {            I_MapEntry limitEntry = this.persistentStore.peek();            ArrayList list = this.transientStore.peekWithLimitEntry(limitEntry);            this.persistentStore.put((I_MapEntry[])list.toArray(new I_MapEntry[list.size()]), false);         }         this.storeNewPersistentRecovery = false;         */         this.isConnected = true;      //}      //catch (XmlBlasterException ex) {      //   log.error(ME, "exception occured when reconnecting. " + ex.getMessage());      //}   }   /**    * Is called after the instance is created.    * @param uniqueQueueId A unique name, allowing to create a unique name for a persistent store (e.g. file name)    * @see org.xmlBlaster.engine.msgstore.I_Map#initialize(StorageId, Object)    */   public void initialize(StorageId uniqueQueueId, Object userData) throws XmlBlasterException {      if (this.isDown) {         java.util.Properties pluginProperties = null;         if (this.pluginInfo != null) pluginProperties = this.pluginInfo.getParameters();         if (pluginProperties == null)            pluginProperties = new java.util.Properties(); // if loaded from testsuite without a PluginManager         this.property = null;         this.ME = this.getClass().getName() + "-" + uniqueQueueId;         this.storageId = uniqueQueueId;         try {            this.property = (QueuePropertyBase)userData;         }         catch(Throwable e) {            throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION, ME, "Can't configure queue, your properties are invalid", e);         }         if (log.isLoggable(Level.FINER)) log.finer("Entering initialize(" + getType() + ", " + getVersion() + ")");                  if (this.property != null && this.glob.isServerSide() != this.property.getGlobal().isServerSide()) {            log.severe("Incompatible globals this.property.getGlobal().isServerSide()=" + this.property.getGlobal().isServerSide() + ": " + Global.getStackTraceAsString(null));         }         this.glob = (ServerScope)this.property.getGlobal();         // For JMX instanceName may not contain ","         String instanceName = this.glob.validateJmxValue(this.storageId.getId());         this.contextNode = new ContextNode(ContextNode.MAP_MARKER_TAG, instanceName,                              this.glob.getContextNode()); // TODO: pass from real parent like TopicInfo         this.mbeanHandle = this.glob.registerMBean(this.contextNode, this);         // StoragePluginManager pluginManager = (StoragePluginManager)this.glob.getObjectEntry("org.xmlBlaster.engine.msgstore.StoragePluginManager");         StoragePluginManager pluginManager = glob.getStoragePluginManager();         QueuePropertyBase queuePropertyBase = (QueuePropertyBase)userData;         //instantiate and initialize the underlying queues         String defaultTransient = pluginProperties.getProperty("transientMap", "RAM,1.0").trim();         if (defaultTransient.startsWith(getType())) {            log.severe("Cache storage configured with transientMap=CACHE, to prevent recursion we set it to 'RAM,1.0'");            defaultTransient = "RAM,1.0";         }                  QueuePropertyBase ramProps = createRamCopy(queuePropertyBase);         ramProps.setEmbedded(true);                  this.transientStore = pluginManager.getPlugin(defaultTransient, uniqueQueueId, ramProps);         if (log.isLoggable(Level.FINE)) log.fine("Created transient part:" + this.transientStore.toXml(""));                  try {            String defaultPersistent = pluginProperties.getProperty("persistentMap", "JDBC,1.0").trim();            if (defaultPersistent.startsWith(getType())) {               log.severe("Cache storage configured with persistentMap=CACHE, to prevent recursion we set it to 'JDBC,1.0'");               defaultPersistent = "JDBC,1.0";            }            boolean oldEmbedded = queuePropertyBase.isEmbedded(); // since a CACHE could be inside a CACHE            queuePropertyBase.setEmbedded(true);            this.persistentStore = pluginManager.getPlugin(defaultPersistent, uniqueQueueId, queuePropertyBase);            queuePropertyBase.setEmbedded(oldEmbedded); // since it is not a clone we make sure to reset it to its original            this.isConnected = true;            // to be notified about reconnections / disconnections//            this.glob.getJdbcQueueManager(this.storageId).registerStorageProblemListener(this);            this.persistentStore.registerStorageProblemListener(this);            if (log.isLoggable(Level.FINE)) log.fine("Created persistent part:" + this.persistentStore.toXml(""));         }         catch (XmlBlasterException ex) {            log.severe("could not initialize the persistent queue. Is the JDBC Driver jar file in the CLASSPATH ? Is the DB up and running ?" + ex.getMessage());            // start a polling thread to see if the connection can be established later          }         // do the queue specific stuff like delete all volatile entries in         // the persistent queue         if (this.persistentStore != null) {            try {               if (log.isLoggable(Level.FINE)) log.fine("Initialize: Removing swapped entries from persistent store, numEntries=" + this.persistentStore.getNumOfEntries() + " numPersistentEntries=" + this.persistentStore.getNumOfPersistentEntries());               this.persistentStore.removeTransient();            }            catch (XmlBlasterException ex) {               log.severe("could not remove transient entries (swapped entries) probably due to no connection to the DB, or the DB is down" + ex.getMessage());            }            // prefill cache (hack: works only for our JDBC queue which implements I_Queue as well)            if (this.persistentStore instanceof org.xmlBlaster.util.queue.I_Queue) {               if (log.isLoggable(Level.FINE)) log.fine("Initialize: Prefilling cache storage with entries");               if (this.persistentStore.getNumOfEntries() > 0) {                  // initial fill of RAM queue ...                  long maxBytes = this.transientStore.getMaxNumOfBytes();                  // this.transientStore.getMaxNumOfEntries();                  long maxEntries = this.transientStore.getMaxNumOfEntries();                  ArrayList entries = null;                  try {                     entries = ((org.xmlBlaster.util.queue.I_Queue)this.persistentStore).peek((int)maxEntries, maxBytes);                     int n = entries.size();                     log.info("Prefilling cache with " + n + " entries");                     synchronized(this) {                        for(int i=0; i<n; i++) {                           I_MapEntry cleanEntry = (I_MapEntry)entries.get(i);                           this.transientStore.put(cleanEntry);                        }                     }                  }                  catch (XmlBlasterException ex) {                     log.severe("could not reload data from persistence probably due to a broken connection to the DB or the DB is not up and running: " + ex.getMessage());                  }               }            }         }         this.isDown = false;         if (log.isLoggable(Level.FINE)) log.fine("Successful initialized: " + toXml(""));      } // isDown?   }   // JMX   public String getQueueName() {      return getStorageId().getStrippedId();   }   // JMX   public int removeById(long uniqueId) throws Exception {      try {         return remove(uniqueId);      }      catch (XmlBlasterException e) {         throw new Exception(e.toString());      }   }   // JMX   public String removeOldestEntry() throws Exception {      try {         I_MapEntry entry = removeOldest();         return (entry==null) ? null : entry.toString();      }      catch (XmlBlasterException e) {         throw new Exception(e.toString());      }   }   // JMX   public int removeTransientEntries() throws Exception {      try {         return removeTransient();      }      catch (XmlBlasterException e) {         throw new Exception(e.toString());      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -