⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cachequeueinterceptorplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
   }   /**    * @see I_Queue#peek(int,long)    */   public ArrayList peek(int numOfEntries, long numOfBytes) throws XmlBlasterException {      synchronized(this.peekSync) {         return this.transientQueue.peek(numOfEntries, numOfBytes);      }   }   /**    * @see I_Queue#peekSamePriority(int, long)    */   public ArrayList peekSamePriority(int numOfEntries, long numOfBytes) throws XmlBlasterException {      synchronized(this.peekSync) {         return this.transientQueue.peekSamePriority(numOfEntries, numOfBytes);      }   }   /**    * @see I_Queue#peekWithPriority(int, long, int, int)    */   public ArrayList peekWithPriority(int numOfEntries, long numOfBytes, int minPriority, int maxPriority) throws XmlBlasterException {      synchronized(this.peekSync) {         return this.transientQueue.peekWithPriority(numOfEntries, numOfBytes, minPriority, maxPriority);      }   }   /**    * @see I_Queue#peekWithLimitEntry(I_QueueEntry)    * @deprecated    */   public ArrayList peekWithLimitEntry(I_QueueEntry limitEntry) throws XmlBlasterException {      synchronized(this.peekSync) {         return this.transientQueue.peekWithLimitEntry(limitEntry);      }   }   /**    * @see I_Queue#removeWithLimitEntry(I_QueueEntry, boolean)    */   synchronized public long removeWithLimitEntry(I_QueueEntry limitEntry, boolean inclusive) throws XmlBlasterException {      long ret = this.transientQueue.removeWithLimitEntry(limitEntry, inclusive);      if (isPersistenceAvailable()) {         try {            ret = this.persistentQueue.removeWithLimitEntry(limitEntry, inclusive);         }         catch (XmlBlasterException ex) {            log.severe(ME+"removeWithLimitEntry: exception occured when removing from persistence. reason: " + ex.getMessage());            ex.printStackTrace();         }         catch (Throwable ex) {            log.severe(ME+"removeWithLimitEntry: exception occured when removing from persistence. reason: " + ex.toString());            ex.printStackTrace();         }      }      return ret;   }   /**    * Removes the first element in the queue    * This method does not block.    * @return Number of messages erased (0 or 1)    * @throws XmlBlasterException if the underlying implementation gets an exception.    */   public int remove() throws XmlBlasterException {      return (int)remove(1, -1L);   }   /**    * Removes max num messages.    * This method does not block.    * @param num Erase num entries or less if less entries are available, -1 erases everything    * @return Number of entries erased    * @throws XmlBlasterException if the underlying implementation gets an exception.    */   public long remove(long numOfEntries, long numOfBytes) throws XmlBlasterException {      long ret = 0L;      int removedEntries = 0;      if (numOfEntries > Integer.MAX_VALUE)         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "remove: too many entries to remove " + numOfEntries);      int nmax = (int)numOfEntries;      if (nmax < 0) nmax = Integer.MAX_VALUE;      if (numOfBytes < 0L) numOfBytes = Long.MAX_VALUE;      I_Entry[] entries = null;      ArrayList  list = null;      boolean[] tmp = null;      try {         synchronized(this) {            while ((nmax > 0) && (numOfBytes > 0L)) {               list = peek(nmax, numOfBytes);               if ((list == null) || (list.size() < 1)) break;               long delta = this.transientQueue.getNumOfBytes();               removedEntries = 0;               entries = (I_Entry[])list.toArray(new I_Entry[list.size()]);               tmp = removeRandomNoNotify(entries);               for (int i=0; i < tmp.length; i++) if (tmp[i]) removedEntries++;                          delta -= this.transientQueue.getNumOfBytes();               nmax -= removedEntries;               ret += removedEntries;               numOfBytes -= delta;            }         }      }      finally {         if (this.notifiedAboutAddOrRemove && tmp!=null && entries!=null) {            for(int i=0; i<tmp.length; i++)               if (tmp[i])                  entries[i].removed(this.queueId);         }      }      this.storageSizeListenerHelper.invokeStorageSizeListener();      return ret;   }   /**    * Removes the given entry.    * @param dataId the unique id. It must be unique within the storage area    *        of the implementing queue. In other words, if the underlying    *        implementation is on RAM, then the storage area is the JVM, that    *        is the queue must be unique in the same JVM. If the queue is a    *        jdbc, the dataId is unique in the DB used.    */   public int removeRandom(long dataId) throws XmlBlasterException {      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeRandom(long) not implemented");   }   /**    * Removes the given entries.    * @param msgQueueEntry the entry to erase.    */   public long removeRandom(long[] dataIdArray) throws XmlBlasterException {      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeRandom(long[]) not implemented");   }   /**    * @see I_Queue#removeRandom(I_Entry)    */   public int removeRandom(I_Entry entry) throws XmlBlasterException {      I_Entry[] entries = new I_Entry[1];      entries[0] = entry;      if (removeRandom(entries)[0]) return 1;      else return 0;   }   /**    * The given ret array will be updated with the result of the removing from the persistent    * queue.    */   private final boolean[] removePossibleSwappedEntries(boolean[] ret, I_Entry[] queueEntries) {      if (log.isLoggable(Level.FINER)) log.finer(ME+"removePossibleSwappedEntries");      // prepare the entries array      if (!isPersistenceAvailable()) return ret;      int numUnremoved = 0;      for (int i=0; i < ret.length; i++) if (!ret[i]) numUnremoved++;      if (numUnremoved == 0) return ret;      if (!hasTransientsSwapped()) return ret;      if (log.isLoggable(Level.FINE)) log.fine(ME+"There were entries '" + numUnremoved + "' to delete on persistence");      if (queueEntries == null || queueEntries.length < 1) return ret;            I_Entry[] unremovedEntries = new I_Entry[numUnremoved];      int count = 0;      for (int i=0; i < ret.length; i++) {         if (!ret[i]) {            unremovedEntries[count] = queueEntries[i];            count++;         }      }      try {         boolean[] ret1 = this.persistentQueue.removeRandom(unremovedEntries);         count = 0;         for (int i=0; i < ret.length; i++) {            if (!ret[i]) {               ret[i] = ret1[count];               count++;               if (log.isLoggable(Level.FINEST)) log.finest(ME+"Entry '" + unremovedEntries[count].getUniqueId() + "' has been deleted ? : " + ret1[count]);            }         }      }      catch (XmlBlasterException ex) {         log.severe(ME+"exception occured when trying to remove entries which have supposely been swapped since the last peek. reason: " + ex.getMessage());         ex.printStackTrace();         return ret;      }      return ret;   }   /**    * @see I_Queue#removeRandom(I_Entry[])    */   private final boolean[] removeRandomNoNotify(I_Entry[] queueEntries) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer(ME+"removeRandom(I_QueueEntry[])");      if (queueEntries == null || queueEntries.length < 1) return new boolean[0];      boolean[] ret = null;      synchronized (this) {         try {           if (isPersistenceAvailable()) {               ArrayList persistents = new ArrayList();               for (int i=0; i < queueEntries.length; i++) {                  if (queueEntries[i].isPersistent()) persistents.add(queueEntries[i]);               }               if (log.isLoggable(Level.FINE)) log.fine(ME+"Remove " + persistents.size() + " persistent entries from persistent storage");               try {                  this.persistentQueue.removeRandom((I_Entry[])persistents.toArray(new I_Entry[persistents.size()]));               }               catch (XmlBlasterException ex) {                  log.severe(ME+"could not remove " + persistents.size() + " entries from the persistent queue. Probably due to failed connection to the DB exception: " +  ex.getMessage());                  ex.printStackTrace();               }            }                       // and now the transient queue (the ram queue)            if (log.isLoggable(Level.FINE)) log.fine(ME+"Removing from transient queue " + queueEntries.length + " entries");            try {               ret = this.transientQueue.removeRandom(queueEntries);               ret = removePossibleSwappedEntries(ret, queueEntries);            }            catch (XmlBlasterException ex) {               log.severe(ME+"could not remove " + queueEntries.length + " entries from the transient queue.: " + ex.getMessage());               ex.printStackTrace();            }         }         finally {            try {               loadFromPersistence();            }            catch (XmlBlasterException ex1) {               log.severe(ME+"removeRandom exception occured when loading from persistence: " + ex1.getMessage());               ex1.printStackTrace();            }         }      }      return ret;   }   /**    * @see I_Queue#removeRandom(I_Entry[])    */   public final boolean[] removeRandom(I_Entry[] queueEntries) throws XmlBlasterException {      boolean[] ret = removeRandomNoNotify(queueEntries);      if (this.notifiedAboutAddOrRemove) {         for(int i=0; i<ret.length; i++) {            if (ret[i]) {              try {                 queueEntries[i].removed(this.queueId);              }              catch (Throwable ex) {                 log.severe(ME+"removeRandom: exception when notifying about removal: " + ex.toString());                 ex.printStackTrace();              }            }         }      }      this.storageSizeListenerHelper.invokeStorageSizeListener();      return ret;   }   /**    * Loads from the persistence so much data as it fits into the transient    * queue.    */   synchronized private final int loadFromPersistence() throws XmlBlasterException {      if (!isPersistenceAvailable()) return 0;      // load further entries from persistence into transient queue      if(hasUncachedEntries()) {         //or should it only fill a certain amount (percent) of the queue size ?         long freeEntries = this.transientQueue.getMaxNumOfEntries() - this.transientQueue.getNumOfEntries();         if (freeEntries > maxFetchSize) freeEntries = maxFetchSize;         long freeBytes = this.transientQueue.getMaxNumOfBytes() - this.transientQueue.getNumOfBytes();         if (freeEntries <= 0L || freeBytes <= 0L) {            if (log.isLoggable(Level.FINE))               log.fine(ME+"The transient queue is already full." +                          " numOfBytes=" + this.transientQueue.getNumOfBytes() +                          " maxNumOfBytes=" + this.transientQueue.getMaxNumOfBytes() +                          " numOfEntries=" + this.transientQueue.getNumOfEntries() +                          " maxNumOfEntries=" + this.transientQueue.getMaxNumOfBytes());            if (log.isLoggable(Level.FINEST)) log.finest(ME+"The real current size in bytes of transient queue is: " + ((RamQueuePlugin)this.transientQueue).getSynchronizedNumOfBytes());            return 0;         }         if (log.isLoggable(Level.FINE)) log.fine(ME+"Swapping: reloading from persistence for a length of " + freeBytes);         // 1. Look into persistent store ...         ArrayList list = null;         try {            list = this.persistentQueue.peek((int)freeEntries, freeBytes);         }         catch (XmlBlasterException ex) {            log.severe(ME+"Could not read back data from persistence: " + ex.getMessage());            ex.printStackTrace();         }         if (list == null || list.size() < 1) {            return 0;         }         // 2. Put it into RAM ...         try {            this.transientQueue.put((I_QueueEntry[])list.toArray(new I_QueueEntry[list.size()]), false);         }         catch (XmlBlasterException ex) {            log.severe(ME+"loadFromPeristence: no space left on transient queue: " + ex.getMessage());            ex.printStackTrace();            return 0;         }         // 3. Erase the swapped and transient entries from persistence ...         ArrayList transients = new ArrayList();         int n = list.size();         for(int i=0; i<n; i++) {            if (!((I_Entry)list.get(i)).isPersistent()) {               transients.add(list.get(i));            }         }         try {            if (transients.size() > 0)               this.persistentQueue.removeRandom((I_Entry[])transients.toArray(new I_Entry[transients.size()]));         }         catch (XmlBlasterException ex) {            log.severe(ME+"loadFromPeristence: Memory leak: problems removing " + transients.size() + " swapped transient entries form persistent store: " + ex.getMessage());            return list.size();         }         return transients.size();      }      return 0;   }   /**    * @see I_Queue#removeWithPriority(long, long, int, int)    */   public long removeWithPriority(long numOfEntries, long numOfBytes, int minPriority, int maxPriority)      throws XmlBlasterException {      ArrayList list = null;      boolean[] tmp = null;      synchronized(this) {         if (numOfEntries > Integer.MAX_VALUE)            throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "remove: too many entries to remove " + numOfEntries);         list = peekWithPriority((int)numOfEntries, numOfBytes, minPriority, maxPriority);         if (list == null || list.size() < 1) return 0L;         tmp = removeRandomNoNotify((I_QueueEntry[])list.toArray(new I_QueueEntry[list.size()]));      }      if (this.notifiedAboutAddOrRemove) {         for(int i=0; i<tmp.length; i++) {            if (tmp[i]) {               try {                  ((I_Entry)list.get(i)).removed(this.queueId);               }               catch (Throwable ex) {                  log.severe(ME+"removeWithPriority exception occured when notifying about removal. Reason: " + ex.toString());                  ex.printStackTrace();  

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -