📄 jdbcqueuecommontableplugin.java
字号:
/*------------------------------------------------------------------------------ Name: JdbcQueueCommonTablePlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.queue.jdbc;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_QueuePutListener;import org.xmlBlaster.util.queue.ReturnDataHolder;import org.xmlBlaster.util.queue.I_StoragePlugin;import org.xmlBlaster.util.queue.StorageSizeListenerHelper;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.msgstore.I_ChangeCallback;import org.xmlBlaster.util.queue.I_StorageProblemListener;import java.io.OutputStream;import java.io.IOException;import java.sql.SQLException;import java.util.ArrayList;import java.util.Properties;/** * Persistence queue implementation on a DB based on JDBC. * @author michele@laghi.eu * @author xmlBlaster@marcelruff.info * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/queue.jdbc.commontable.html">The queue.jdbc.commontable requirement</a> */public final class JdbcQueueCommonTablePlugin implements I_Queue, I_StoragePlugin, I_Map{ private String ME; private StorageId storageId; private boolean notifiedAboutAddOrRemove = false; private Global glob; private static Logger log = Logger.getLogger(JdbcQueueCommonTablePlugin.class.getName()); private QueuePropertyBase property; private JdbcManagerCommonTable manager = null; private I_QueuePutListener putListener; // to set it to -999L makes it easier to identify than -1L private long numOfEntries = -999L; private long numOfPersistentEntries = -999L; private long numOfPersistentBytes = -999L; private long numOfBytes = -999L; boolean isDown = true; /** Monitor object used to synchronize the count of sizes */ private Object modificationMonitor = new Object(); private PluginInfo pluginInfo = null; private boolean debug = false; private int entryCounter; private StorageSizeListenerHelper storageSizeListenerHelper; public JdbcQueueCommonTablePlugin() { this.storageSizeListenerHelper = new StorageSizeListenerHelper(this); } public boolean isTransient() { return false; } /** * This method resets all cached sizes and counters. While testing it * could be invoked before each public invocation to see which method fails ... */ private final void resetCounters() { try { this.numOfPersistentBytes = -999L; this.numOfPersistentBytes = getNumOfPersistentBytes_(true); this.numOfPersistentEntries = -999L; this.numOfPersistentEntries = getNumOfPersistentEntries_(true); this.numOfBytes = -999L; this.numOfBytes = getNumOfBytes_(); this.numOfEntries = -999L; this.numOfEntries = getNumOfEntries_(); } catch (XmlBlasterException ex) { if (log.isLoggable(Level.FINE)) log.fine("resetCounters exception occured: " + ex.getMessage()); } } /** * Check is storage is big enough for entry * @param mapEntry may not be null * @return null There is space (otherwise the error text is returned) */ private String spaceLeft(long numOfEntries, long sizeInBytes) { // allow one owerload only ... numOfEntries = 0L; sizeInBytes = 0L; if (this.property == null) { return "Storage framework is down, current settings are" + toXml(""); } if ((numOfEntries + getNumOfEntries()) > getMaxNumOfEntries()) return "Queue overflow (number of entries), " + getNumOfEntries() + " entries are in queue, try increasing property '" + this.property.getPropName("maxEntries") + "' and '" + this.property.getPropName("maxEntriesCache") + "', current settings are" + toXml(""); if ((sizeInBytes + getNumOfBytes()) > getMaxNumOfBytes()) return "Queue overflow, " + getMaxNumOfBytes() + " bytes are in queue, try increasing property '" + this.property.getPropName("maxBytes") + "' and '" + this.property.getPropName("maxBytesCache") + "', current settings are" + toXml(""); return null; } /** * Calculates the size in bytes of all entries in the array. *//* private long calculateSizeInBytes(I_QueueEntry[] entries) { long sum = 0L; for (int i=0; i<entries.length; i++) { sum += entries[i].getSizeInBytes(); } return sum; }*/ /** * Returns a JdbcManagerCommonTable for a specific queue. It strips the queueId to * find out to which manager it belongs. If such a manager does not exist * yet, it is created and initialized. * A queueId must be of the kind: cb:some/id/or/someother * where the important requirement here is that it contains a ':' character. * text on the left side of the separator (in this case 'cb') tells which * kind of queue it is: for example a callback queue (cb) or a client queue. */ protected JdbcManagerCommonTable getJdbcQueueManagerCommonTable(PluginInfo pluginInfo) throws XmlBlasterException { String location = ME + "/type '" + pluginInfo.getType() + "' version '" + pluginInfo.getVersion() + "'"; String managerName = pluginInfo.toString(); // + "-" + pluginInfo.getTypeVersion(); Object obj = this.glob.getObjectEntry(managerName); JdbcManagerCommonTable manager = null; try { if (obj == null) { synchronized (JdbcManagerCommonTable.class) { obj = this.glob.getObjectEntry(managerName); // could have been initialized meanwhile if ( obj == null) { JdbcConnectionPool pool = new JdbcConnectionPool(); pool.initialize(this.glob, pluginInfo.getParameters()); manager = new JdbcManagerCommonTable(pool, this.glob.getEntryFactory(), managerName, this); pool.registerStorageProblemListener(manager); manager.setUp(); if (log.isLoggable(Level.FINE)) log.fine("Created JdbcManagerCommonTable instance for storage plugin configuration '" + managerName + "'"); this.glob.addObjectEntry(managerName, manager); } else manager = (JdbcManagerCommonTable)obj; } } else manager = (JdbcManagerCommonTable)obj; if (!manager.getPool().isInitialized()) { manager.getPool().initialize(this.glob, pluginInfo.getParameters()); if (log.isLoggable(Level.FINE)) log.fine("Initialized JdbcManager pool for storage class '" + managerName + "'"); } } catch (ClassNotFoundException ex) { log.severe("getJdbcCommonTableQueueManager class not found: " + ex.getMessage()); ex.printStackTrace(); throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, location, "getJdbcQueueCommonTableManager: class not found when initializing the connection pool", ex); } catch (SQLException ex) { throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, location, "getJdbcCommonTableQueueManager: sql exception when initializing the connection pool", ex); } catch (Throwable ex) { if (log.isLoggable(Level.FINE)) { log.fine("getJdbcQueueCommonTableManager internal exception: " + ex.toString()); ex.printStackTrace(); } throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, location, "getJdbcQueueManager: throwable when initializing the connection pool", ex); } if (this.glob.getWipeOutDB()) { synchronized (this.glob) { if (this.glob.getWipeOutDB()) { manager.wipeOutDB(true); this.glob.setWipeOutDB(false); } } } return manager; } /** * Is called after the instance is created. * @param uniqueQueueId A unique name, allowing to create a unique name for a persistent store (e.g. file name) * @see I_Queue#initialize(StorageId, Object) */ synchronized public void initialize(StorageId uniqueQueueId, Object userData) throws XmlBlasterException { if (this.isDown) { this.property = null; setProperties(userData); this.ME = this.getClass().getName() + "-" + uniqueQueueId; this.storageId = uniqueQueueId; if (log.isLoggable(Level.FINER)) log.finer("initialize '" + this.storageId + "'"); if (this.property != null && this.glob.isServerSide() != this.property.getGlobal().isServerSide()) { log.severe("Incompatible globals this.property.getGlobal().isServerSide()=" + this.property.getGlobal().isServerSide() + ": " + Global.getStackTraceAsString(null)); } this.glob = this.property.getGlobal(); this.manager = getJdbcQueueManagerCommonTable(this.pluginInfo); this.numOfEntries = this.manager.getNumOfEntries(getStorageId().getStrippedId()); this.numOfBytes = this.manager.getNumOfBytes(this.storageId.getStrippedId()); this.numOfPersistentEntries = this.manager.getNumOfPersistents(getStorageId().getStrippedId()); this.numOfPersistentBytes = this.manager.getSizeOfPersistents(getStorageId().getStrippedId()); this.isDown = false; this.manager.registerQueue(this); if (log.isLoggable(Level.FINE)) log.fine("Successful initialized"); } boolean dbg = this.glob.getProperty().get("queue/debug", false); if (dbg == true) this.property.setDebug(true); this.debug = this.property.getDebug(); if (this.debug) { log.warning("initialize: debugging is enabled"); } } /** * @see I_Queue#setProperties(Object) */ public void setProperties(Object userData) throws XmlBlasterException { if (userData == null) return; QueuePropertyBase newProp; try { newProp = (QueuePropertyBase)userData; } catch(Throwable e) { log.severe("Can't configure queue, your properties are invalid: " + e.toString()); e.printStackTrace(); return; } // sync necessary? /* Protect against shrinking ?? if (this.property != null && this.property.getMaxEntries() > newProp.getMaxEntries()) { log.warn(ME, "Reconfigure of a RamQueuePlugin - getMaxNumOfEntries from " + this.property.getMaxEntries() + " to " + newProp.getMaxEntries() + " is not supported, we ignore the new setting."); return; } */ this.property = newProp; } /** * Access the current queue configuration */ public Object getProperties() { return this.property; } public void setNotifiedAboutAddOrRemove(boolean notify) { this.notifiedAboutAddOrRemove = notify; } public boolean isNotifiedAboutAddOrRemove() { return this.notifiedAboutAddOrRemove; } /** * @see I_Queue#addPutListener(I_QueuePutListener) */ public void addPutListener(I_QueuePutListener l) { if (l == null) throw new IllegalArgumentException(ME + ": addPustListener(null) is not allowed"); if (this.putListener != null) throw new IllegalArgumentException(ME + ": addPustListener() failed, there is a listener registered already"); this.putListener = l; } /** * @see I_Queue#removePutListener(I_QueuePutListener) */ public void removePutListener(I_QueuePutListener l) { this.putListener = null; } /** * Gets the references of the entries in the queue. Note that the data * which is referenced here may be changed by other threads. */ public long[] getEntryReferences() throws XmlBlasterException { // currently not implemented throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "getEntryReferences not implemented"); } /** * @see I_Queue#getEntries(I_EntryFilter) */ public ArrayList getEntries(I_EntryFilter entryFilter) throws XmlBlasterException { return this.manager.getEntries(getStorageId(), -1, -1L, entryFilter); } /** * @see I_Queue#put(I_QueueEntry, boolean) */ public void put(I_QueueEntry queueEntry, boolean ignorePutInterceptor) throws XmlBlasterException { if (queueEntry == null) return; if ((this.putListener != null) && (!ignorePutInterceptor)) { // Is an interceptor registered (and not bypassed) ? if (this.putListener.putPre(queueEntry) == false) return; } put(queueEntry); if (this.putListener != null && !ignorePutInterceptor) { this.putListener.putPost(queueEntry);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -