📄 ramqueueplugin.java
字号:
} //return (I_QueueEntry)this.storage.iterator().next(); } /** * @param minPrio Extension to I_Queue: if -1 then only entries with similar priority as the first one are taken (= peekSamePriority()) * @see I_Queue#peek(int, long) */ public ArrayList peekWithPriority(int numOfEntries, long numOfBytes, int minPrio, int maxPrio) throws XmlBlasterException { return genericPeek(numOfEntries, numOfBytes, minPrio, maxPrio).list; } /** * * @param numOfEntries the number of entries to peek. If -1 then all entries * found are peeked. * @param numOfBytes as input it is the size in bytes to retrieve. * @param minPrio the minimum priority to return. If a negative number, then * all entries which have the same priority as the first entry are returned. * this value is inclusive. * @param maxPrio the maximum priority to return (inclusive). */ private ReturnDataHolder genericPeek(int numOfEntries, long numOfBytes, int minPrio, int maxPrio) throws XmlBlasterException { ReturnDataHolder ret = new ReturnDataHolder();// long numOfBytes = bytes.longValue();// long count = 0L; long currentSizeInBytes = 0L;// long totalSizeInBytes = 0L;// ArrayList ret = new ArrayList(); if (getNumOfEntries() < 1) return ret; synchronized (this) { Iterator iter = this.storage.iterator(); // find all elements to delete ... while (iter.hasNext() && (ret.countEntries<numOfEntries||numOfEntries<0)) { I_QueueEntry entry = (I_QueueEntry)iter.next(); currentSizeInBytes = entry.getSizeInBytes(); if ((ret.countBytes+currentSizeInBytes>=numOfBytes) && ret.countEntries>0L && numOfBytes>-1) break; // further specific breaks ... int prio = entry.getPriority(); if (minPrio < 0) { minPrio = prio; maxPrio = minPrio; } if (prio < minPrio) break; if (prio <= maxPrio) { ret.countBytes += currentSizeInBytes; ret.list.add(entry); ret.countEntries++; } } } return ret; } /** * @see I_Queue#peek(int, long) */ public ArrayList peek(int numOfEntries, long numOfBytes) throws XmlBlasterException { return genericPeek(numOfEntries, numOfBytes, 0, MAX_PRIO).list; } /** * @see I_Queue#peekSamePriority(int, long) */ public ArrayList peekSamePriority(int numOfEntries, long numOfBytes) throws XmlBlasterException { return genericPeek(numOfEntries, numOfBytes, -1, -1).list; } /** * @see I_Queue#peekWithLimitEntry(I_QueueEntry) * @deprecated */ public ArrayList peekWithLimitEntry(I_QueueEntry limitEntry) throws XmlBlasterException { if (limitEntry == null) return new ArrayList(); synchronized (this) { return new ArrayList(this.storage.headSet(limitEntry)); } } /** * @see I_Queue#removeWithLimitEntry(I_QueueEntry, boolean) */ public long removeWithLimitEntry(I_QueueEntry limitEntry, boolean inclusive) throws XmlBlasterException { long ret = 0L; if (limitEntry == null) return ret; synchronized (this) { SortedSet set = this.storage.headSet(limitEntry); ret = set.size(); this.storage.removeAll(set); if (inclusive) { if (this.storage.remove(limitEntry)) ret++; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * @see I_Queue#getNumOfEntries() */ public long getNumOfEntries() { synchronized (this) { return this.storage.size(); } } /**i * @see I_Queue#getNumOfPersistentEntries() */ public long getNumOfPersistentEntries() { return this.numOfPersistentEntries; } /** * @see I_Queue#getMaxNumOfEntries() */ public long getMaxNumOfEntries() { return property.getMaxEntries(); } /** * @see I_Queue#getNumOfBytes() */ public long getNumOfBytes() { return this.sizeInBytes; } /** * @see I_Queue#getNumOfPersistentBytes() */ public long getNumOfPersistentBytes() { return this.persistentSizeInBytes; } /** * Gets the number of bytes by really reading (i.e. by scanning the whole * queue contents) the number of bytes of each single entry * @see I_Queue#getNumOfBytes() */ public long getSynchronizedNumOfBytes() { synchronized (this) { Iterator iter = this.storage.iterator(); long sum = 0L; while (iter.hasNext()) { sum += ((I_QueueEntry)(iter.next())).getSizeInBytes(); } return sum; } } /** * @see I_Queue#getMaxNumOfBytes() */ public long getMaxNumOfBytes() { return this.property.getMaxBytes(); } /** * @see I_Queue#removeRandom(I_Entry) */ public int removeRandom(I_Entry entry) throws XmlBlasterException { I_Entry[] arr = new I_Entry[1]; arr[0] = entry; if (removeRandom(arr)[0]) return 1; else return 0; } /** * @see I_Queue#removeRandom(I_Entry[]) */ public boolean[] removeRandom(I_Entry[] queueEntries) throws XmlBlasterException { if ((queueEntries == null) || (queueEntries.length == 0)) return new boolean[0]; boolean ret[] = new boolean[queueEntries.length]; ArrayList entriesToRemove = new ArrayList(); try { synchronized(this) { if (this.storage.size() == 0) return ret; // all entries are false /* Did not work with all virtual machines ... this.storage.removeAll(java.util.Arrays.asList(queueEntries)); */ for (int j=0; j<queueEntries.length; j++) { if (queueEntries[j] == null) continue; if (this.notifiedAboutAddOrRemove) { entriesToRemove.add(queueEntries[j]); } queueEntries[j].setStored(false); // tell the entry it has been removed from the storage ... if (this.storage.remove(queueEntries[j])) { ret[j] = true; I_Entry entry = queueEntries[j]; this.sizeInBytes -= entry.getSizeInBytes(); if (entry.isPersistent()) { this.persistentSizeInBytes -= entry.getSizeInBytes(); this.numOfPersistentEntries--; } } } } } finally { for (int i=0; i < entriesToRemove.size(); i++) { ((I_Entry)entriesToRemove.get(i)).removed(this.storageId); } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * Currently NOT supported by I_Queue. */ public I_QueueEntry take() throws XmlBlasterException { ArrayList list = take(1, -1L); if (list == null || list.size() < 1) return null; return (I_QueueEntry)list.get(0); } /** * Currently NOT supported by I_Queue. */ public ArrayList take(int numOfEntries, long numOfBytes) throws XmlBlasterException { return takeWithPriority(numOfEntries, numOfBytes, 0, MAX_PRIO); } /** */ public ArrayList takeSamePriority(int numOfEntries, long numOfBytes) throws XmlBlasterException { return takeWithPriority(numOfEntries, numOfBytes, -1, -1); } /** * @see I_Queue */ public ArrayList takeWithPriority(int numOfEntries, long numOfBytes, int minPriority, int maxPriority) throws XmlBlasterException { if (isShutdown) { log.warning("The queue is shutdown, no message access is possible."); if (log.isLoggable(Level.FINE)) Thread.dumpStack(); throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "The queue is shutdown, no message access is possible."); } ArrayList ret = null; ArrayList entriesToRemove = new ArrayList(); try { synchronized (this) { ret = genericPeek(numOfEntries, numOfBytes, minPriority, maxPriority).list; for (int i=0; i < ret.size(); i++) { I_QueueEntry entry = (I_QueueEntry)ret.get(i); if (this.notifiedAboutAddOrRemove) { entriesToRemove.add(entry); } entry.setStored(false); // tell the entry it has been removed from the storage ... if (this.storage.remove(entry)) { this.sizeInBytes -= entry.getSizeInBytes(); if (entry.isPersistent()) { this.numOfPersistentEntries--; this.persistentSizeInBytes -= entry.getSizeInBytes(); } } } } } finally { for (int i=0; i < entriesToRemove.size(); i++) { ((I_Entry)entriesToRemove.get(i)).removed(this.storageId); } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * Helper method to find out if still to retrieve entries in getAndDeleteLowest or not. */ private final boolean isInsideRange(int numEntries, int maxNumEntries, long numBytes, long maxNumBytes) { if (maxNumEntries < 0) { if (maxNumBytes <0L) return true; return numBytes < maxNumBytes; } // then maxNumEntries >= 0 if (maxNumBytes <0L) return numEntries < maxNumEntries; // then the less restrictive of both is used (since none is negative) return numEntries < maxNumEntries || numBytes < maxNumBytes; } /** * @see I_Queue#takeLowest(int, long, I_QueueEntry, boolean) */ public ArrayList takeLowest(int numOfEntries, long numOfBytes, I_QueueEntry limitEntry, boolean leaveOne) throws XmlBlasterException { return takeOrPeekLowest(numOfEntries, numOfBytes, limitEntry, leaveOne, true); } /** * @see I_Queue#peekLowest(int, long, I_QueueEntry, boolean) */ public ArrayList peekLowest(int numOfEntries, long numOfBytes, I_QueueEntry limitEntry, boolean leaveOne) throws XmlBlasterException { return takeOrPeekLowest(numOfEntries, numOfBytes, limitEntry, leaveOne, false); } /** * @see I_Queue#takeLowest(int, long, I_QueueEntry, boolean) */ private ArrayList takeOrPeekLowest(int numOfEntries, long numOfBytes, I_QueueEntry limitEntry, boolean leaveOne, boolean doDelete) throws XmlBlasterException { ArrayList ret = null; ArrayList entriesToRemove = new ArrayList(); try { synchronized(this) { LinkedList list = new LinkedList(this.storage); ListIterator iter = list.listIterator(list.size());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -