⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jdbcqueuecommontableplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/*------------------------------------------------------------------------------                              Name:      JdbcQueueCommonTablePlugin.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.queue.jdbc;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_QueuePutListener;import org.xmlBlaster.util.queue.ReturnDataHolder;import org.xmlBlaster.util.queue.I_StoragePlugin;import org.xmlBlaster.util.queue.StorageSizeListenerHelper;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.msgstore.I_ChangeCallback;import org.xmlBlaster.util.queue.I_StorageProblemListener;import java.io.OutputStream;import java.io.IOException;import java.sql.SQLException;import java.util.ArrayList;import java.util.Properties;/** * Persistence queue implementation on a DB based on JDBC. * @author michele@laghi.eu * @author xmlBlaster@marcelruff.info * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/queue.jdbc.commontable.html">The queue.jdbc.commontable requirement</a> */public final class JdbcQueueCommonTablePlugin implements I_Queue, I_StoragePlugin, I_Map{   private String ME;   private StorageId storageId;   private boolean notifiedAboutAddOrRemove = false;   private Global glob;   private static Logger log = Logger.getLogger(JdbcQueueCommonTablePlugin.class.getName());   private QueuePropertyBase property;   private JdbcManagerCommonTable manager = null;   private I_QueuePutListener putListener;   // to set it to -999L makes it easier to identify than -1L   private long numOfEntries = -999L;   private long numOfPersistentEntries = -999L;   private long numOfPersistentBytes = -999L;   private long numOfBytes = -999L;   boolean isDown = true;   /** Monitor object used to synchronize the count of sizes */   private Object modificationMonitor = new Object();   private PluginInfo pluginInfo = null;   private boolean debug = false;   private int entryCounter;   private StorageSizeListenerHelper storageSizeListenerHelper;      public JdbcQueueCommonTablePlugin() {      this.storageSizeListenerHelper = new StorageSizeListenerHelper(this);   }      public boolean isTransient() {      return false;   }   /**    * This method resets all cached sizes and counters. While testing it    * could be invoked before each public invocation to see which method fails ...    */   private final void resetCounters() {      try {         this.numOfPersistentBytes = -999L;         this.numOfPersistentBytes = getNumOfPersistentBytes_(true);         this.numOfPersistentEntries = -999L;         this.numOfPersistentEntries = getNumOfPersistentEntries_(true);         this.numOfBytes = -999L;         this.numOfBytes = getNumOfBytes_();         this.numOfEntries = -999L;         this.numOfEntries = getNumOfEntries_();      }      catch (XmlBlasterException ex) {         if (log.isLoggable(Level.FINE)) log.fine("resetCounters exception occured: " + ex.getMessage());      }   }   /**    * Check is storage is big enough for entry    * @param mapEntry may not be null    * @return null There is space (otherwise the error text is returned)    */   private String spaceLeft(long numOfEntries, long sizeInBytes) {            // allow one owerload only ...      numOfEntries = 0L;      sizeInBytes  = 0L;      if (this.property == null) {         return "Storage framework is down, current settings are" + toXml("");      }      if ((numOfEntries + getNumOfEntries()) > getMaxNumOfEntries())         return "Queue overflow (number of entries), " + getNumOfEntries() +                " entries are in queue, try increasing property '" +                this.property.getPropName("maxEntries") + "' and '" +                this.property.getPropName("maxEntriesCache") + "', current settings are" + toXml("");      if ((sizeInBytes + getNumOfBytes()) > getMaxNumOfBytes())         return "Queue overflow, " + getMaxNumOfBytes() +                " bytes are in queue, try increasing property '" +                 this.property.getPropName("maxBytes") + "' and '" +                this.property.getPropName("maxBytesCache") + "', current settings are" + toXml("");      return null;   }   /**    * Calculates the size in bytes of all entries in the array.    *//*   private long calculateSizeInBytes(I_QueueEntry[] entries) {      long sum = 0L;      for (int i=0; i<entries.length; i++) {         sum += entries[i].getSizeInBytes();      }      return sum;   }*/   /**    * Returns a JdbcManagerCommonTable for a specific queue. It strips the queueId to    * find out to which manager it belongs. If such a manager does not exist    * yet, it is created and initialized.    * A queueId must be of the kind: cb:some/id/or/someother    * where the important requirement here is that it contains a ':' character.    * text on the left side of the separator (in this case 'cb') tells which    * kind of queue it is: for example a callback queue (cb) or a client queue.    */   protected JdbcManagerCommonTable getJdbcQueueManagerCommonTable(PluginInfo pluginInfo)      throws XmlBlasterException {      String location = ME + "/type '" + pluginInfo.getType() + "' version '" + pluginInfo.getVersion() + "'";      String managerName = pluginInfo.toString(); //  + "-" + pluginInfo.getTypeVersion();      Object obj = this.glob.getObjectEntry(managerName);                    JdbcManagerCommonTable manager = null;      try {         if (obj == null) {           synchronized (JdbcManagerCommonTable.class) {              obj = this.glob.getObjectEntry(managerName); // could have been initialized meanwhile                            if ( obj == null) {                 JdbcConnectionPool pool = new JdbcConnectionPool();                 pool.initialize(this.glob, pluginInfo.getParameters());                 manager = new JdbcManagerCommonTable(pool, this.glob.getEntryFactory(), managerName, this);                 pool.registerStorageProblemListener(manager);                 manager.setUp();                 if (log.isLoggable(Level.FINE)) log.fine("Created JdbcManagerCommonTable instance for storage plugin configuration '" + managerName + "'");                       this.glob.addObjectEntry(managerName, manager);              }              else manager = (JdbcManagerCommonTable)obj;           }         }         else manager = (JdbcManagerCommonTable)obj;         if (!manager.getPool().isInitialized()) {            manager.getPool().initialize(this.glob, pluginInfo.getParameters());            if (log.isLoggable(Level.FINE)) log.fine("Initialized JdbcManager pool for storage class '" + managerName + "'");         }      }      catch (ClassNotFoundException ex) {         log.severe("getJdbcCommonTableQueueManager class not found: " + ex.getMessage());         ex.printStackTrace();         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, location, "getJdbcQueueCommonTableManager: class not found when initializing the connection pool", ex);      }      catch (SQLException ex) {         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, location, "getJdbcCommonTableQueueManager: sql exception when initializing the connection pool", ex);      }      catch (Throwable ex) {         if (log.isLoggable(Level.FINE)) {            log.fine("getJdbcQueueCommonTableManager internal exception: " + ex.toString());            ex.printStackTrace();         }         throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, location, "getJdbcQueueManager: throwable when initializing the connection pool", ex);      }      if (this.glob.getWipeOutDB()) {         synchronized (this.glob) {            if (this.glob.getWipeOutDB()) {               manager.wipeOutDB(true);               this.glob.setWipeOutDB(false);            }         }      }      return manager;   }   /**    * Is called after the instance is created.    * @param uniqueQueueId A unique name, allowing to create a unique name for a persistent store (e.g. file name)    * @see I_Queue#initialize(StorageId, Object)    */   synchronized public void initialize(StorageId uniqueQueueId, Object userData)      throws XmlBlasterException   {      if (this.isDown) {         this.property = null;         setProperties(userData);                  this.ME = this.getClass().getName() + "-" + uniqueQueueId;         this.storageId = uniqueQueueId;         if (log.isLoggable(Level.FINER)) log.finer("initialize '" + this.storageId + "'");         if (this.property != null && this.glob.isServerSide() != this.property.getGlobal().isServerSide()) {            log.severe("Incompatible globals this.property.getGlobal().isServerSide()=" + this.property.getGlobal().isServerSide() + ": " + Global.getStackTraceAsString(null));         }         this.glob = this.property.getGlobal();         this.manager = getJdbcQueueManagerCommonTable(this.pluginInfo);         this.numOfEntries = this.manager.getNumOfEntries(getStorageId().getStrippedId());         this.numOfBytes = this.manager.getNumOfBytes(this.storageId.getStrippedId());         this.numOfPersistentEntries = this.manager.getNumOfPersistents(getStorageId().getStrippedId());         this.numOfPersistentBytes = this.manager.getSizeOfPersistents(getStorageId().getStrippedId());         this.isDown = false;         this.manager.registerQueue(this);         if (log.isLoggable(Level.FINE)) log.fine("Successful initialized");      }      boolean dbg = this.glob.getProperty().get("queue/debug", false);      if (dbg == true) this.property.setDebug(true);      this.debug = this.property.getDebug();      if (this.debug) {         log.warning("initialize: debugging is enabled");      }   }   /**    * @see I_Queue#setProperties(Object)    */   public void setProperties(Object userData) throws XmlBlasterException {      if (userData == null) return;      QueuePropertyBase newProp;      try {         newProp = (QueuePropertyBase)userData;      }      catch(Throwable e) {         log.severe("Can't configure queue, your properties are invalid: " + e.toString());         e.printStackTrace();         return;      }      // sync necessary?      /* Protect against shrinking ??      if (this.property != null && this.property.getMaxEntries() > newProp.getMaxEntries()) {         log.warn(ME, "Reconfigure of a RamQueuePlugin - getMaxNumOfEntries from " + this.property.getMaxEntries() +                    " to " + newProp.getMaxEntries() + " is not supported, we ignore the new setting.");         return;      }      */      this.property = newProp;   }   /**    * Access the current queue configuration    */   public Object getProperties() {      return this.property;   }   public void setNotifiedAboutAddOrRemove(boolean notify) {      this.notifiedAboutAddOrRemove = notify;   }   public boolean isNotifiedAboutAddOrRemove() {      return this.notifiedAboutAddOrRemove;   }   /**    * @see I_Queue#addPutListener(I_QueuePutListener)    */   public void addPutListener(I_QueuePutListener l) {      if (l == null)         throw new IllegalArgumentException(ME + ": addPustListener(null) is not allowed");      if (this.putListener != null)         throw new IllegalArgumentException(ME + ": addPustListener() failed, there is a listener registered already");      this.putListener = l;   }   /**    * @see I_Queue#removePutListener(I_QueuePutListener)    */   public void removePutListener(I_QueuePutListener l) {      this.putListener = null;   }   /**    * Gets the references of the entries in the queue. Note that the data    * which is referenced here may be changed by other threads.    */   public long[] getEntryReferences() throws XmlBlasterException {      // currently not implemented      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "getEntryReferences not implemented");   }   /**    * @see I_Queue#getEntries(I_EntryFilter)    */   public ArrayList getEntries(I_EntryFilter entryFilter) throws XmlBlasterException {      return this.manager.getEntries(getStorageId(), -1, -1L, entryFilter);   }   /**    * @see I_Queue#put(I_QueueEntry, boolean)    */   public void put(I_QueueEntry queueEntry, boolean ignorePutInterceptor)      throws XmlBlasterException   {      if (queueEntry == null) return;      if ((this.putListener != null) && (!ignorePutInterceptor)) {         // Is an interceptor registered (and not bypassed) ?         if (this.putListener.putPre(queueEntry) == false)            return;      }      put(queueEntry);      if (this.putListener != null && !ignorePutInterceptor) {         this.putListener.putPost(queueEntry);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -