⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 persistencecacheplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
   }   /**    * We set the cache props to the real props for RAM queue running under a cacheQueue    */   private QueuePropertyBase createRamCopy(QueuePropertyBase queuePropertyBase) {      QueuePropertyBase ramCopy = (QueuePropertyBase)queuePropertyBase.clone();      ramCopy.setMaxEntries(queuePropertyBase.getMaxEntriesCache());      ramCopy.setMaxBytes(queuePropertyBase.getMaxBytesCache());      return ramCopy;   }   /**    * @see I_Map#setProperties(Object)    */   public synchronized void setProperties(Object userData) throws XmlBlasterException {      if (userData == null) return;      if (log.isLoggable(Level.FINER)) log.finer("Entering setProperties()");      QueuePropertyBase newProp;      try {         newProp = (QueuePropertyBase)userData;      }      catch(Throwable e) {         throw new XmlBlasterException(glob, ErrorCode.RESOURCE_CONFIGURATION, ME, "Can't configure queue, your properties are invalid", e);      }      /* Do we need to protect against shrinking?      if (this.property != null && this.property.getMaxEntries() > newProp.getMaxEntries()) {         log.warn(ME, "Reconfigure of a RamQueuePlugin - getMaxNumOfEntries from " + this.property.getMaxEntries() +                    " to " + newProp.getMaxEntries() + " is not supported, we ignore the new setting.");         return;      }      */      this.property = newProp;      this.transientStore.setProperties(createRamCopy((QueuePropertyBase)userData));      if (this.persistentStore != null) this.persistentStore.setProperties(userData);   }   /**    * Access the current queue configuration    */   public Object getProperties() {      return this.property;   }   // JMX   public String getPropertyStr() {      return (this.property == null) ? "" : this.property.toXml();   }   /**    * All entries are stored into the transient queue. All persistent entries are    * stored also in the persistent queue. The exceeding size in the transient    * queue is calculated. If it is positive it means we need to swap. The    * overflowing entries are taken from the ram queue. The volatile between    * them are stored in the persistent storage (since the persistent ones have    * been previously stored).    * @see I_Map#put(I_MapEntry)    */   public int put(I_MapEntry mapEntry) throws XmlBlasterException {      if (mapEntry == null)         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "put(I_MapEntry="+mapEntry+")");      if (log.isLoggable(Level.FINER)) log.finer("put(" + mapEntry.getLogId() + ")");      int numPersistentPut = 0;      int numTransientPut = 0;      synchronized(this) {         XmlBlasterException exceptionReturned = spaceLeftException(mapEntry, this);         if (exceptionReturned != null) {            if (log.isLoggable(Level.FINE)) log.fine(exceptionReturned.getMessage());            exceptionReturned.setLocation(ME+"-put("+mapEntry.getLogId()+")");            throw exceptionReturned;         }         // separate persistent from transient entries and store the persistents in persistence         if (this.persistentStore != null && this.isConnected) {            if (mapEntry.isPersistent()) {               XmlBlasterException exceptionReturned2 = spaceLeftException(mapEntry, this.persistentStore);               if (exceptionReturned2 != null) {                  if (log.isLoggable(Level.FINE)) log.fine(exceptionReturned2.getMessage());                  exceptionReturned2.setLocation(ME+"-put("+mapEntry.getLogId()+")");                  throw exceptionReturned2;               }               try {                  numPersistentPut = this.persistentStore.put(mapEntry);               }               catch (XmlBlasterException ex) {                  log.severe("put: an error occurred when writing to the persistent queue, the persistent entry " + mapEntry.getLogId() +                                " will temporarily be handled as transient. Is the DB up and running ? " + ex.getMessage() + "state "  + this.toXml(""));               }            }         }         assureTransientSpace(mapEntry);                  numTransientPut = this.transientStore.put(mapEntry);      } // sync(this)      this.storageSizeListenerHelper.invokeStorageSizeListener();      if (numPersistentPut>0 || numTransientPut>0) {         return 1;      }      // NOTE: It is possible that a persistent entry is not put to persistent storage      // e.g. because of 'duplicate key' (entry existed already) and same with RAM queue      // In this case the caller does get a 0      return 0;   }   /**    * Swap an entry away to hard disk.     * Call this method from synchronized code only.    * @param mapEntry The new entry which needs space for itself.     */   private void assureTransientSpace(I_MapEntry mapEntry) throws XmlBlasterException {      while (!spaceLeft(mapEntry, this.transientStore)) {         /* Protect against infinite looping */         if (this.transientStore == null || this.property == null ||             this.transientStore.getNumOfEntries() < 1)            break;         I_MapEntry oldest = this.transientStore.removeOldest();         if (oldest == null) {            if (log.isLoggable(Level.FINE)) log.fine("The RAM queue is full, new entry '" + mapEntry.getUniqueId() + "' seems to be the first and only one, so we accept it");            break;         }         if (log.isLoggable(Level.FINER)) log.finer("Swapping '" + oldest.getLogId() + "' to HD ...");         try {            if (!oldest.isPersistent()) { // if entry is marked as persistent it is already in persistentStore (see code above)               // swap away the oldest cache entry to harddisk ...               if (log.isLoggable(Level.FINE)) log.fine("Swapping '" + oldest.getLogId() + " size=" + oldest.getSizeInBytes() + "'. Exceeding size state after removing from transient before entering persistent: " + toXml(""));               if (this.persistentStore == null)                  throw new XmlBlasterException(glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME,                        "assureTransientSpace: no persistent queue configured, needed for swapping, entry " + mapEntry.getLogId() + " is not handled");               if (!this.isConnected)                  throw new XmlBlasterException(glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME,                        "assureTransientSpace: The DB is currently disconnected, entry " + mapEntry.getLogId() + " is not handled");               if (spaceLeft(oldest, this.persistentStore)) {                  try {                     this.persistentStore.put(oldest);                  }                  catch (XmlBlasterException ex) {                     log.severe("assureTransientSpace: an error occured when writing to the persistent queue, transient entry " +  oldest.getLogId() +                            " is not swapped, new entry '" + mapEntry.getLogId() + "' is rejected. Is the DB up and running ? " + ex.getMessage() + " state: " + toXml(""));                     throw ex;                  }               }               else                  throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_BYTES, ME,                              "assureTransientSpace: maximum size in bytes for the persistent queue exceeded when swapping, entry " + mapEntry.getLogId() + " not handled . State: " + toXml(""));            }            oldest.isSwapped(true);         }         catch(XmlBlasterException ex2) {            this.transientStore.put(oldest); // undo on error            throw ex2;  // swapping failed, we won't accept the new entry         }      }   }   /**    * Check is storage is big enough for entry    * @param mapEntry may not be null    * @return null There is space (otherwise the error text is returned)    */   private XmlBlasterException spaceLeftException(I_MapEntry mapEntry, I_Map map) {      if (map == null || this.property == null) {         return new XmlBlasterException(glob, ErrorCode.RESOURCE_UNAVAILABLE, ME,                "Storage framework is down, current settings are" + toXml(""));      }      if ((1 + map.getNumOfEntries()) > map.getMaxNumOfEntries())         return new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME,                "Queue overflow (number of entries), " + getNumOfEntries() +                " entries are in queue, try increasing property '" +                this.property.getPropName("maxEntries") + "' and '" +                this.property.getPropName("maxEntriesCache") + "', current settings are" + toXml(""));      if ((mapEntry.getSizeInBytes() + map.getNumOfBytes()) > map.getMaxNumOfBytes())         return new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_BYTES, ME,                "Queue overflow, " + getMaxNumOfBytes() +                " bytes are in queue, try increasing property '" +                 this.property.getPropName("maxBytes") + "' and '" +                this.property.getPropName("maxBytesCache") + "', current settings are" + toXml(""));      return null;   }   /**    * Check is storage is big enough for entry    * @param mapEntry may not be null    * @return true Space enough    */   private boolean spaceLeft(I_MapEntry mapEntry, I_Map map) {      if (map == null || this.property == null)         return false;      if ((1 + map.getNumOfEntries()) > map.getMaxNumOfEntries())         return false;      if ((mapEntry.getSizeInBytes() + map.getNumOfBytes()) > map.getMaxNumOfBytes())         return false;      return true;   }   /**    * Returns the unique ID of this queue    */   public StorageId getStorageId() {      return this.storageId;   }   /**    * @see I_Map#get(long)    */   public I_MapEntry get(final long uniqueId) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Entering get(" + uniqueId + ")");      I_MapEntry mapEntry = null;      synchronized(this) {         mapEntry = this.transientStore.get(uniqueId);         if (mapEntry != null) {            mapEntry.isSwapped(false);            return mapEntry;         }         if (this.persistentStore == null)            return null;         mapEntry = this.persistentStore.get(uniqueId);         if (mapEntry == null) {            return null;         }         // Ok, we need to swap transient entry back from persistence store         if (!mapEntry.isPersistent()) {            this.persistentStore.remove(mapEntry);         }         assureTransientSpace(mapEntry);                  this.transientStore.put(mapEntry);         mapEntry.isSwapped(false);      } // synchronized(this)      return mapEntry;   }   /**    * Access all entries.     * <p />    * TODO !!!: This method should be changed to an iterator approach    * as if we have swapped messages they won't fit to memory.     * @see I_Map#getAll()    */   public I_MapEntry[] getAll(final I_EntryFilter entryFilter) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Entering getAll()");      synchronized (this) {         //log.error(ME, "getAll() DEBUG ONLY: numSwapped=" + numSwapped() + " transient=" + this.transientStore.getNumOfEntries() + " persistentStore=" + this.persistentStore.getNumOfEntries());         java.util.Map map = new java.util.TreeMap(); // To suppress same entry twice and to be sorted (sorted is not yet specified to be necessary)                  I_MapEntry[] ramEntries = null;         if (entryFilter != null) {            ramEntries = this.transientStore.getAll(new I_EntryFilter() {               public I_Entry intercept(I_Entry entry, I_Storage storage) {                  if (entry.isPersistent())                        return null; // take the one from persistent store                  return entryFilter.intercept(entry, storage);               }            });         }         else {            ramEntries = this.transientStore.getAll(null);         }         for(int i=0; i<ramEntries.length; i++) {            map.put(new Long(ramEntries[i].getUniqueId()), ramEntries[i]);         }         //log.error(ME, "getAll() DEBUG ONLY: map.size=" + map.size() + " numSwapped=" + numSwapped() + " transient=" + this.transientStore.getNumOfEntries());         if (this.persistentStore != null) {            I_MapEntry[] persistEntries = this.persistentStore.getAll(entryFilter);            if (persistEntries != null) {               for(int i=0; i<persistEntries.length; i++) {                  if (persistEntries[i] == null) continue;                  map.put(new Long(persistEntries[i].getUniqueId()), persistEntries[i]);               }            }            //log.error(ME, "getAll() DEBUG ONLY: map.size=" + map.size() + " numSwapped=" + numSwapped() + " persistentStore=" + this.persistentStore.getNumOfEntries());         }         return (I_MapEntry[])map.values().toArray(new I_MapEntry[map.size()]);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -