📄 persistencecacheplugin.java
字号:
/*------------------------------------------------------------------------------Name: PersistenceCachePlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine.msgstore.cache;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.def.PriorityEnum;import org.xmlBlaster.util.admin.I_AdminMap;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_StoragePlugin;import org.xmlBlaster.util.queue.StorageSizeListenerHelper;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.storage.MsgUnitStoreProperty;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.msgstore.I_ChangeCallback;import org.xmlBlaster.engine.msgstore.StoragePluginManager;import org.xmlBlaster.util.queue.I_StorageProblemListener;import java.io.File;import java.io.FileOutputStream;import java.io.OutputStream;import java.util.ArrayList;import java.util.Properties;/** * Implements a random access message storage. * <p/> * The implementation uses internally a RAM and a JDBC map and handles the caching between those two. * @author michele@laghi.eu * @see <a href="http://www.xmlblaster.org/xmlBlaster/doc/requirements/engine.persistence.html">The engine.persistence requirement</a> * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> * @see org.xmlBlaster.test.classtest.msgstore.I_MapTest */public class PersistenceCachePlugin implements I_StoragePlugin, I_StorageProblemListener, I_Map, PersistenceCachePluginMBean{ private String ME; private ContextNode contextNode; private ServerScope glob; private static Logger log = Logger.getLogger(PersistenceCachePlugin.class.getName());// private java.util.Properties pluginProperties; // properties via I_Plugin private QueuePropertyBase property; // properties via I_Map boolean isDown = true; private StorageId storageId; private I_Map transientStore; private I_Map persistentStore; private boolean isConnected = false; private PluginInfo pluginInfo = null; /** My JMX registration */ private Object mbeanHandle; private StorageSizeListenerHelper storageSizeListenerHelper; public PersistenceCachePlugin() { this.storageSizeListenerHelper = new StorageSizeListenerHelper(this); } /* * this boolean is set only under the time a recovery after having reconnected * to the DB. It is used to limit the synchronization private boolean storeNewPersistentRecovery = false; */ /** * Triggered by persistent store (JDBC) on lost connection * @see org.xmlBlaster.util.queue.jdbc.I_ConnectionListener#disconnected() */ public void storageUnavailable(int oldStatus) { log.finer("storageUnavailable"); this.isConnected = false; } public boolean isTransient() { return this.transientStore.isTransient() && (this.persistentStore == null || this.persistentStore.isTransient()); } /** * Triggered by persistent store (JDBC) on reconnection * @see I_StorageProblemListener#storageAvailable(int) */ public void storageAvailable(int oldStatus) { if (oldStatus == I_StorageProblemListener.UNDEF) return; log.finer("storageAvailable"); /* remove all obsolete entries from the persistence. Obsolete are the * entries which are lower (lower priority and older) than the lowest * entry in the transient storage. */ if (this.persistentStore == null) return; // should never happen //try { log.warning("Persistent store has reconnected, we may have a memory leak as send messsages are not cleaned up. Current persistent messages are handled transient only, new ones will be handled persistent"); /* // TODO: Implement an arraylist to remember the sent messages and destroy them // Happens for persistent messages and swapped messages (if JDBC connection lost) // For swapped entries the callback thread could block (poll) until the swap is available again. synchronized(this.deleteDeliveredMonitor) { I_MapEntry limitEntry = this.transientStore.peek(); ArrayList list = this.persistentStore.peekWithLimitEntry(limitEntry); this.persistentStore.removeRandom((I_Entry[])list.toArray(new I_Entry[list.size()])); } */ /* log.warn(ME, "Persistent store has reconnected, current persistent messages are handled transient only, new ones will be handled persistent"); // add all new persistent entries to the persistent storage ... this.storeNewPersistentRecovery = true; synchronized(this.storeNewPersistentRecoveryMonitor) { I_MapEntry limitEntry = this.persistentStore.peek(); ArrayList list = this.transientStore.peekWithLimitEntry(limitEntry); this.persistentStore.put((I_MapEntry[])list.toArray(new I_MapEntry[list.size()]), false); } this.storeNewPersistentRecovery = false; */ this.isConnected = true; //} //catch (XmlBlasterException ex) { // log.error(ME, "exception occured when reconnecting. " + ex.getMessage()); //} } /** * Is called after the instance is created. * @param uniqueQueueId A unique name, allowing to create a unique name for a persistent store (e.g. file name) * @see org.xmlBlaster.engine.msgstore.I_Map#initialize(StorageId, Object) */ public void initialize(StorageId uniqueQueueId, Object userData) throws XmlBlasterException { if (this.isDown) { java.util.Properties pluginProperties = null; if (this.pluginInfo != null) pluginProperties = this.pluginInfo.getParameters(); if (pluginProperties == null) pluginProperties = new java.util.Properties(); // if loaded from testsuite without a PluginManager this.property = null; this.ME = this.getClass().getName() + "-" + uniqueQueueId; this.storageId = uniqueQueueId; try { this.property = (QueuePropertyBase)userData; } catch(Throwable e) { throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION, ME, "Can't configure queue, your properties are invalid", e); } if (log.isLoggable(Level.FINER)) log.finer("Entering initialize(" + getType() + ", " + getVersion() + ")"); if (this.property != null && this.glob.isServerSide() != this.property.getGlobal().isServerSide()) { log.severe("Incompatible globals this.property.getGlobal().isServerSide()=" + this.property.getGlobal().isServerSide() + ": " + Global.getStackTraceAsString(null)); } this.glob = (ServerScope)this.property.getGlobal(); // For JMX instanceName may not contain "," String instanceName = this.glob.validateJmxValue(this.storageId.getId()); this.contextNode = new ContextNode(ContextNode.MAP_MARKER_TAG, instanceName, this.glob.getContextNode()); // TODO: pass from real parent like TopicInfo this.mbeanHandle = this.glob.registerMBean(this.contextNode, this); // StoragePluginManager pluginManager = (StoragePluginManager)this.glob.getObjectEntry("org.xmlBlaster.engine.msgstore.StoragePluginManager"); StoragePluginManager pluginManager = glob.getStoragePluginManager(); QueuePropertyBase queuePropertyBase = (QueuePropertyBase)userData; //instantiate and initialize the underlying queues String defaultTransient = pluginProperties.getProperty("transientMap", "RAM,1.0").trim(); if (defaultTransient.startsWith(getType())) { log.severe("Cache storage configured with transientMap=CACHE, to prevent recursion we set it to 'RAM,1.0'"); defaultTransient = "RAM,1.0"; } QueuePropertyBase ramProps = createRamCopy(queuePropertyBase); ramProps.setEmbedded(true); this.transientStore = pluginManager.getPlugin(defaultTransient, uniqueQueueId, ramProps); if (log.isLoggable(Level.FINE)) log.fine("Created transient part:" + this.transientStore.toXml("")); try { String defaultPersistent = pluginProperties.getProperty("persistentMap", "JDBC,1.0").trim(); if (defaultPersistent.startsWith(getType())) { log.severe("Cache storage configured with persistentMap=CACHE, to prevent recursion we set it to 'JDBC,1.0'"); defaultPersistent = "JDBC,1.0"; } boolean oldEmbedded = queuePropertyBase.isEmbedded(); // since a CACHE could be inside a CACHE queuePropertyBase.setEmbedded(true); this.persistentStore = pluginManager.getPlugin(defaultPersistent, uniqueQueueId, queuePropertyBase); queuePropertyBase.setEmbedded(oldEmbedded); // since it is not a clone we make sure to reset it to its original this.isConnected = true; // to be notified about reconnections / disconnections// this.glob.getJdbcQueueManager(this.storageId).registerStorageProblemListener(this); this.persistentStore.registerStorageProblemListener(this); if (log.isLoggable(Level.FINE)) log.fine("Created persistent part:" + this.persistentStore.toXml("")); } catch (XmlBlasterException ex) { log.severe("could not initialize the persistent queue. Is the JDBC Driver jar file in the CLASSPATH ? Is the DB up and running ?" + ex.getMessage()); // start a polling thread to see if the connection can be established later } // do the queue specific stuff like delete all volatile entries in // the persistent queue if (this.persistentStore != null) { try { if (log.isLoggable(Level.FINE)) log.fine("Initialize: Removing swapped entries from persistent store, numEntries=" + this.persistentStore.getNumOfEntries() + " numPersistentEntries=" + this.persistentStore.getNumOfPersistentEntries()); this.persistentStore.removeTransient(); } catch (XmlBlasterException ex) { log.severe("could not remove transient entries (swapped entries) probably due to no connection to the DB, or the DB is down" + ex.getMessage()); } // prefill cache (hack: works only for our JDBC queue which implements I_Queue as well) if (this.persistentStore instanceof org.xmlBlaster.util.queue.I_Queue) { if (log.isLoggable(Level.FINE)) log.fine("Initialize: Prefilling cache storage with entries"); if (this.persistentStore.getNumOfEntries() > 0) { // initial fill of RAM queue ... long maxBytes = this.transientStore.getMaxNumOfBytes(); // this.transientStore.getMaxNumOfEntries(); long maxEntries = this.transientStore.getMaxNumOfEntries(); ArrayList entries = null; try { entries = ((org.xmlBlaster.util.queue.I_Queue)this.persistentStore).peek((int)maxEntries, maxBytes); int n = entries.size(); log.info("Prefilling cache with " + n + " entries"); synchronized(this) { for(int i=0; i<n; i++) { I_MapEntry cleanEntry = (I_MapEntry)entries.get(i); this.transientStore.put(cleanEntry); } } } catch (XmlBlasterException ex) { log.severe("could not reload data from persistence probably due to a broken connection to the DB or the DB is not up and running: " + ex.getMessage()); } } } } this.isDown = false; if (log.isLoggable(Level.FINE)) log.fine("Successful initialized: " + toXml("")); } // isDown? } // JMX public String getQueueName() { return getStorageId().getStrippedId(); } // JMX public int removeById(long uniqueId) throws Exception { try { return remove(uniqueId); } catch (XmlBlasterException e) { throw new Exception(e.toString()); } } // JMX public String removeOldestEntry() throws Exception { try { I_MapEntry entry = removeOldest(); return (entry==null) ? null : entry.toString(); } catch (XmlBlasterException e) { throw new Exception(e.toString()); } } // JMX public int removeTransientEntries() throws Exception { try { return removeTransient(); } catch (XmlBlasterException e) { throw new Exception(e.toString()); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -