📄 cachequeueinterceptorplugin.java
字号:
} /** * @see I_Queue#peek(int,long) */ public ArrayList peek(int numOfEntries, long numOfBytes) throws XmlBlasterException { synchronized(this.peekSync) { return this.transientQueue.peek(numOfEntries, numOfBytes); } } /** * @see I_Queue#peekSamePriority(int, long) */ public ArrayList peekSamePriority(int numOfEntries, long numOfBytes) throws XmlBlasterException { synchronized(this.peekSync) { return this.transientQueue.peekSamePriority(numOfEntries, numOfBytes); } } /** * @see I_Queue#peekWithPriority(int, long, int, int) */ public ArrayList peekWithPriority(int numOfEntries, long numOfBytes, int minPriority, int maxPriority) throws XmlBlasterException { synchronized(this.peekSync) { return this.transientQueue.peekWithPriority(numOfEntries, numOfBytes, minPriority, maxPriority); } } /** * @see I_Queue#peekWithLimitEntry(I_QueueEntry) * @deprecated */ public ArrayList peekWithLimitEntry(I_QueueEntry limitEntry) throws XmlBlasterException { synchronized(this.peekSync) { return this.transientQueue.peekWithLimitEntry(limitEntry); } } /** * @see I_Queue#removeWithLimitEntry(I_QueueEntry, boolean) */ synchronized public long removeWithLimitEntry(I_QueueEntry limitEntry, boolean inclusive) throws XmlBlasterException { long ret = this.transientQueue.removeWithLimitEntry(limitEntry, inclusive); if (isPersistenceAvailable()) { try { ret = this.persistentQueue.removeWithLimitEntry(limitEntry, inclusive); } catch (XmlBlasterException ex) { log.severe(ME+"removeWithLimitEntry: exception occured when removing from persistence. reason: " + ex.getMessage()); ex.printStackTrace(); } catch (Throwable ex) { log.severe(ME+"removeWithLimitEntry: exception occured when removing from persistence. reason: " + ex.toString()); ex.printStackTrace(); } } return ret; } /** * Removes the first element in the queue * This method does not block. * @return Number of messages erased (0 or 1) * @throws XmlBlasterException if the underlying implementation gets an exception. */ public int remove() throws XmlBlasterException { return (int)remove(1, -1L); } /** * Removes max num messages. * This method does not block. * @param num Erase num entries or less if less entries are available, -1 erases everything * @return Number of entries erased * @throws XmlBlasterException if the underlying implementation gets an exception. */ public long remove(long numOfEntries, long numOfBytes) throws XmlBlasterException { long ret = 0L; int removedEntries = 0; if (numOfEntries > Integer.MAX_VALUE) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "remove: too many entries to remove " + numOfEntries); int nmax = (int)numOfEntries; if (nmax < 0) nmax = Integer.MAX_VALUE; if (numOfBytes < 0L) numOfBytes = Long.MAX_VALUE; I_Entry[] entries = null; ArrayList list = null; boolean[] tmp = null; try { synchronized(this) { while ((nmax > 0) && (numOfBytes > 0L)) { list = peek(nmax, numOfBytes); if ((list == null) || (list.size() < 1)) break; long delta = this.transientQueue.getNumOfBytes(); removedEntries = 0; entries = (I_Entry[])list.toArray(new I_Entry[list.size()]); tmp = removeRandomNoNotify(entries); for (int i=0; i < tmp.length; i++) if (tmp[i]) removedEntries++; delta -= this.transientQueue.getNumOfBytes(); nmax -= removedEntries; ret += removedEntries; numOfBytes -= delta; } } } finally { if (this.notifiedAboutAddOrRemove && tmp!=null && entries!=null) { for(int i=0; i<tmp.length; i++) if (tmp[i]) entries[i].removed(this.queueId); } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * Removes the given entry. * @param dataId the unique id. It must be unique within the storage area * of the implementing queue. In other words, if the underlying * implementation is on RAM, then the storage area is the JVM, that * is the queue must be unique in the same JVM. If the queue is a * jdbc, the dataId is unique in the DB used. */ public int removeRandom(long dataId) throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeRandom(long) not implemented"); } /** * Removes the given entries. * @param msgQueueEntry the entry to erase. */ public long removeRandom(long[] dataIdArray) throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeRandom(long[]) not implemented"); } /** * @see I_Queue#removeRandom(I_Entry) */ public int removeRandom(I_Entry entry) throws XmlBlasterException { I_Entry[] entries = new I_Entry[1]; entries[0] = entry; if (removeRandom(entries)[0]) return 1; else return 0; } /** * The given ret array will be updated with the result of the removing from the persistent * queue. */ private final boolean[] removePossibleSwappedEntries(boolean[] ret, I_Entry[] queueEntries) { if (log.isLoggable(Level.FINER)) log.finer(ME+"removePossibleSwappedEntries"); // prepare the entries array if (!isPersistenceAvailable()) return ret; int numUnremoved = 0; for (int i=0; i < ret.length; i++) if (!ret[i]) numUnremoved++; if (numUnremoved == 0) return ret; if (!hasTransientsSwapped()) return ret; if (log.isLoggable(Level.FINE)) log.fine(ME+"There were entries '" + numUnremoved + "' to delete on persistence"); if (queueEntries == null || queueEntries.length < 1) return ret; I_Entry[] unremovedEntries = new I_Entry[numUnremoved]; int count = 0; for (int i=0; i < ret.length; i++) { if (!ret[i]) { unremovedEntries[count] = queueEntries[i]; count++; } } try { boolean[] ret1 = this.persistentQueue.removeRandom(unremovedEntries); count = 0; for (int i=0; i < ret.length; i++) { if (!ret[i]) { ret[i] = ret1[count]; count++; if (log.isLoggable(Level.FINEST)) log.finest(ME+"Entry '" + unremovedEntries[count].getUniqueId() + "' has been deleted ? : " + ret1[count]); } } } catch (XmlBlasterException ex) { log.severe(ME+"exception occured when trying to remove entries which have supposely been swapped since the last peek. reason: " + ex.getMessage()); ex.printStackTrace(); return ret; } return ret; } /** * @see I_Queue#removeRandom(I_Entry[]) */ private final boolean[] removeRandomNoNotify(I_Entry[] queueEntries) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer(ME+"removeRandom(I_QueueEntry[])"); if (queueEntries == null || queueEntries.length < 1) return new boolean[0]; boolean[] ret = null; synchronized (this) { try { if (isPersistenceAvailable()) { ArrayList persistents = new ArrayList(); for (int i=0; i < queueEntries.length; i++) { if (queueEntries[i].isPersistent()) persistents.add(queueEntries[i]); } if (log.isLoggable(Level.FINE)) log.fine(ME+"Remove " + persistents.size() + " persistent entries from persistent storage"); try { this.persistentQueue.removeRandom((I_Entry[])persistents.toArray(new I_Entry[persistents.size()])); } catch (XmlBlasterException ex) { log.severe(ME+"could not remove " + persistents.size() + " entries from the persistent queue. Probably due to failed connection to the DB exception: " + ex.getMessage()); ex.printStackTrace(); } } // and now the transient queue (the ram queue) if (log.isLoggable(Level.FINE)) log.fine(ME+"Removing from transient queue " + queueEntries.length + " entries"); try { ret = this.transientQueue.removeRandom(queueEntries); ret = removePossibleSwappedEntries(ret, queueEntries); } catch (XmlBlasterException ex) { log.severe(ME+"could not remove " + queueEntries.length + " entries from the transient queue.: " + ex.getMessage()); ex.printStackTrace(); } } finally { try { loadFromPersistence(); } catch (XmlBlasterException ex1) { log.severe(ME+"removeRandom exception occured when loading from persistence: " + ex1.getMessage()); ex1.printStackTrace(); } } } return ret; } /** * @see I_Queue#removeRandom(I_Entry[]) */ public final boolean[] removeRandom(I_Entry[] queueEntries) throws XmlBlasterException { boolean[] ret = removeRandomNoNotify(queueEntries); if (this.notifiedAboutAddOrRemove) { for(int i=0; i<ret.length; i++) { if (ret[i]) { try { queueEntries[i].removed(this.queueId); } catch (Throwable ex) { log.severe(ME+"removeRandom: exception when notifying about removal: " + ex.toString()); ex.printStackTrace(); } } } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * Loads from the persistence so much data as it fits into the transient * queue. */ synchronized private final int loadFromPersistence() throws XmlBlasterException { if (!isPersistenceAvailable()) return 0; // load further entries from persistence into transient queue if(hasUncachedEntries()) { //or should it only fill a certain amount (percent) of the queue size ? long freeEntries = this.transientQueue.getMaxNumOfEntries() - this.transientQueue.getNumOfEntries(); if (freeEntries > maxFetchSize) freeEntries = maxFetchSize; long freeBytes = this.transientQueue.getMaxNumOfBytes() - this.transientQueue.getNumOfBytes(); if (freeEntries <= 0L || freeBytes <= 0L) { if (log.isLoggable(Level.FINE)) log.fine(ME+"The transient queue is already full." + " numOfBytes=" + this.transientQueue.getNumOfBytes() + " maxNumOfBytes=" + this.transientQueue.getMaxNumOfBytes() + " numOfEntries=" + this.transientQueue.getNumOfEntries() + " maxNumOfEntries=" + this.transientQueue.getMaxNumOfBytes()); if (log.isLoggable(Level.FINEST)) log.finest(ME+"The real current size in bytes of transient queue is: " + ((RamQueuePlugin)this.transientQueue).getSynchronizedNumOfBytes()); return 0; } if (log.isLoggable(Level.FINE)) log.fine(ME+"Swapping: reloading from persistence for a length of " + freeBytes); // 1. Look into persistent store ... ArrayList list = null; try { list = this.persistentQueue.peek((int)freeEntries, freeBytes); } catch (XmlBlasterException ex) { log.severe(ME+"Could not read back data from persistence: " + ex.getMessage()); ex.printStackTrace(); } if (list == null || list.size() < 1) { return 0; } // 2. Put it into RAM ... try { this.transientQueue.put((I_QueueEntry[])list.toArray(new I_QueueEntry[list.size()]), false); } catch (XmlBlasterException ex) { log.severe(ME+"loadFromPeristence: no space left on transient queue: " + ex.getMessage()); ex.printStackTrace(); return 0; } // 3. Erase the swapped and transient entries from persistence ... ArrayList transients = new ArrayList(); int n = list.size(); for(int i=0; i<n; i++) { if (!((I_Entry)list.get(i)).isPersistent()) { transients.add(list.get(i)); } } try { if (transients.size() > 0) this.persistentQueue.removeRandom((I_Entry[])transients.toArray(new I_Entry[transients.size()])); } catch (XmlBlasterException ex) { log.severe(ME+"loadFromPeristence: Memory leak: problems removing " + transients.size() + " swapped transient entries form persistent store: " + ex.getMessage()); return list.size(); } return transients.size(); } return 0; } /** * @see I_Queue#removeWithPriority(long, long, int, int) */ public long removeWithPriority(long numOfEntries, long numOfBytes, int minPriority, int maxPriority) throws XmlBlasterException { ArrayList list = null; boolean[] tmp = null; synchronized(this) { if (numOfEntries > Integer.MAX_VALUE) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "remove: too many entries to remove " + numOfEntries); list = peekWithPriority((int)numOfEntries, numOfBytes, minPriority, maxPriority); if (list == null || list.size() < 1) return 0L; tmp = removeRandomNoNotify((I_QueueEntry[])list.toArray(new I_QueueEntry[list.size()])); } if (this.notifiedAboutAddOrRemove) { for(int i=0; i<tmp.length; i++) { if (tmp[i]) { try { ((I_Entry)list.get(i)).removed(this.queueId); } catch (Throwable ex) { log.severe(ME+"removeWithPriority exception occured when notifying about removal. Reason: " + ex.toString()); ex.printStackTrace();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -