📄 jdbcqueuecommontableplugin.java
字号:
} } /** * Internally used for I_MapEntry and I_QueueEntry * @return true on success */ private boolean put(I_Entry entry) throws XmlBlasterException { boolean ret = false; synchronized (this.modificationMonitor) { String exTxt = null; if ((exTxt=spaceLeft(1, entry.getSizeInBytes())) != null) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, exTxt); if (getNumOfBytes_() > getMaxNumOfBytes()) { throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_BYTES, ME, "put: the maximum number of bytes reached." + " Number of bytes=" + this.numOfBytes + " maxmimum number of bytes=" + getMaxNumOfBytes() + " status: " + this.toXml("")); } try { if (this.manager.addEntry(this.storageId.getStrippedId(), entry)) { this.numOfEntries++; this.numOfBytes += entry.getSizeInBytes(); if (entry.isPersistent()) { this.numOfPersistentEntries++; this.numOfPersistentBytes += entry.getSizeInBytes(); } ret = true; } } catch (XmlBlasterException ex) { resetCounters(); throw ex; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * @see I_Queue#put(I_QueueEntry[], boolean) */ public void put(I_QueueEntry[] queueEntries, boolean ignorePutInterceptor) throws XmlBlasterException { XmlBlasterException ex0 = null; if (queueEntries == null) return; if ((this.putListener != null) &&(!ignorePutInterceptor)) { // Is an interceptor registered (and not bypassed) ? if (this.putListener.putPre(queueEntries) == false) return; } synchronized (this.modificationMonitor) { String exTxt = null; if ((exTxt=spaceLeft(queueEntries.length, /*calculateSizeInBytes(queueEntries)*/ 0L)) != null) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, exTxt); if (getNumOfBytes_() > getMaxNumOfBytes()) { throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_BYTES, ME, "put[]: the maximum number of bytes reached." + " Number of bytes=" + this.numOfBytes + " maxmimum number of bytes=" + getMaxNumOfBytes() + " status: " + this.toXml("")); } try { int[] help = this.manager.addEntries(this.storageId.getStrippedId(), queueEntries); for (int i=0; i < queueEntries.length; i++) { // boolean isProcessed = this.manager.addEntry(this.storageId.getStrippedId(), queueEntries[i]); boolean isProcessed = help[i] > 0 || help[i] == -2; // !!! JDK 1.4 only: Statement.SUCCESS_NO_INFO = -2; if (log.isLoggable(Level.FINE)) { log.fine("put(I_Entry[]) the entry nr. " + i + " returned '" + help[i] + "'"); } if (isProcessed) { this.numOfEntries++; this.numOfBytes += queueEntries[i].getSizeInBytes(); if (queueEntries[i].isPersistent()) { this.numOfPersistentEntries++; this.numOfPersistentBytes += queueEntries[i].getSizeInBytes(); } } } } catch (XmlBlasterException ex) { ex0 = ex; resetCounters(); } if (ex0 != null) throw ex0; } if (this.putListener != null && !ignorePutInterceptor) { this.putListener.putPost(queueEntries); } this.storageSizeListenerHelper.invokeStorageSizeListener(); } /** * Returns the unique ID of this queue */ public StorageId getStorageId() { return this.storageId; } /** * Currently not supported by I_Queue. */ public I_QueueEntry take() throws XmlBlasterException { // note that this method could be drastically improved // however it is unlikely to be used so I avoid that tuning now synchronized (this.modificationMonitor) { I_QueueEntry ret = this.peek(); this.removeRandom(ret.getUniqueId()); return ret; } } /** * @see I_Queue#takeWithPriority(int,long,int,int) */ public ArrayList takeWithPriority(int numOfEntries, long numOfBytes, int minPriority, int maxPriority) throws XmlBlasterException { if (numOfEntries == 0) return new ArrayList(); synchronized (this.modificationMonitor) { ArrayList ret = this.peekWithPriority(numOfEntries, numOfBytes, minPriority, maxPriority); this.removeRandom( (I_QueueEntry[])ret.toArray(new I_QueueEntry[ret.size()]) ); return ret; } } /** * Currently not supported by I_Queue. */ public ArrayList take(int numOfEntries, long numOfBytes) throws XmlBlasterException { if (numOfEntries == 0) return new ArrayList(); ArrayList ret = null; I_EntryFilter entryFilter = null; synchronized(this.modificationMonitor) { try { ret = this.manager.getEntries(getStorageId(), numOfEntries, numOfBytes, entryFilter); long ids[] = new long[ret.size()]; for (int i=0; i < ids.length; i++) ids[i] = ((I_QueueEntry)ret.get(i)).getUniqueId(); boolean tmp[] = this.manager.deleteEntries(getStorageId().getStrippedId(), ids); for (int i=0; i < tmp.length; i++) { if (tmp[i]) this.numOfEntries--; } } finally { resetCounters(); } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * @see I_Queue#takeLowest(int, long, I_QueueEntry, boolean) */ public ArrayList takeLowest(int numOfEntries, long numOfBytes, I_QueueEntry limitEntry, boolean leaveOne) throws XmlBlasterException { // I could change the concept here by just checking if an entry with the // given uniqueId is found the search algorithm should break. This would // increase performance. However this method is probably never called on // this particular implementation. long minUniqueId = 0L; int maxPriority = Integer.MAX_VALUE; if (limitEntry != null) { minUniqueId = limitEntry.getUniqueId(); maxPriority = limitEntry.getPriority(); } ReturnDataHolder ret = null; synchronized(this.modificationMonitor) { try { ret = this.manager.getAndDeleteLowest(getStorageId(), numOfEntries, numOfBytes, maxPriority, minUniqueId, leaveOne, true); this.numOfBytes -= ret.countBytes; this.numOfEntries -= ret.countEntries; this.numOfPersistentBytes = -999L; this.numOfPersistentBytes = getNumOfPersistentBytes_(true); this.numOfPersistentEntries = -999L; this.numOfPersistentEntries = getNumOfPersistentEntries_(true); } catch (XmlBlasterException ex) { resetCounters(); throw ex; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); if (ret == null) return null; return ret.list; } /** * @see I_Queue#peekLowest(int, long, I_QueueEntry, boolean) */ public ArrayList peekLowest(int numOfEntries, long numOfBytes, I_QueueEntry limitEntry, boolean leaveOne) throws XmlBlasterException { long minUniqueId = 0L; int maxPriority = Integer.MAX_VALUE; if (limitEntry != null) { minUniqueId = limitEntry.getUniqueId(); maxPriority = limitEntry.getPriority(); } ReturnDataHolder ret = this.manager.getAndDeleteLowest(getStorageId(), numOfEntries, numOfBytes, maxPriority, minUniqueId, leaveOne, false); return ret.list; } /** * @see I_Queue#peek() */ public I_QueueEntry peek() throws XmlBlasterException { I_EntryFilter entryFilter = null; ArrayList ret = this.manager.getEntries(getStorageId(), 1, -1L, entryFilter); if (ret.size() < 1) return null; return (I_QueueEntry)ret.get(0); } /** * @see I_Queue#peek(int,long) */ public ArrayList peek(int numOfEntries, long numOfBytes) throws XmlBlasterException { if (numOfEntries == 0) return new ArrayList(); I_EntryFilter entryFilter = null; ArrayList ret = this.manager.getEntries(getStorageId(), numOfEntries, numOfBytes, entryFilter); return ret; } /** * @see I_Queue#peekSamePriority(int, long) */ public ArrayList peekSamePriority(int numOfEntries, long numOfBytes) throws XmlBlasterException { if (numOfEntries == 0) return new ArrayList(); ArrayList ret = this.manager.getEntriesBySamePriority(getStorageId(), numOfEntries, numOfBytes); return ret; } /** * @see I_Queue#peekWithPriority(int, long, int, int) */ public ArrayList peekWithPriority(int numOfEntries, long numOfBytes, int minPriority, int maxPriority) throws XmlBlasterException { if (numOfEntries == 0) return new ArrayList(); I_EntryFilter entryFilter = null; ArrayList ret = this.manager.getEntriesByPriority(getStorageId(), numOfEntries, numOfBytes, minPriority, maxPriority, entryFilter); return ret; } /** * @see I_Queue#peekWithLimitEntry(I_QueueEntry) * @deprecated */ public ArrayList peekWithLimitEntry(I_QueueEntry limitEntry) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("peekWithLimitEntry called"); if (limitEntry == null) return new ArrayList(); return this.manager.getEntriesWithLimit(getStorageId(), limitEntry); } /** * @see I_Queue#removeWithLimitEntry(I_QueueEntry, boolean) */ public long removeWithLimitEntry(I_QueueEntry limitEntry, boolean inclusive) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("removeWithLimitEntry called"); if (limitEntry == null) return 0L; long ret = 0L; synchronized(this.modificationMonitor) { try { ret = this.manager.removeEntriesWithLimit(getStorageId(), limitEntry, inclusive); if (ret != 0) { // since we are not able to calculate the size in the cache we have to recalculate it resetCounters(); } } catch (XmlBlasterException ex) { resetCounters(); throw ex; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * Removes the first element in the queue * This method does not block. * @return Number of messages erased (0 or 1) * @throws XmlBlasterException if the underlying implementation gets an exception. */ public int remove() throws XmlBlasterException { ReturnDataHolder ret = null; synchronized(this.modificationMonitor) { try { ret = this.manager.deleteFirstEntries(getStorageId().getStrippedId(), 1, -1L); this.numOfEntries -= (int)ret.countEntries; this.numOfBytes -= ret.countBytes; this.numOfPersistentBytes = -999L; this.numOfPersistentBytes = getNumOfPersistentBytes_(true); this.numOfPersistentEntries = -999L; this.numOfPersistentEntries = getNumOfPersistentEntries_(true); } catch (XmlBlasterException ex) { resetCounters(); throw ex; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); if (ret == null) return 0; return (int)ret.countEntries; } /** * Removes max numOfEntries messages. * This method does not block. * @param numOfEntries Erase num entries or less if less entries are available, -1 erases everything * @return Number of entries erased * @throws XmlBlasterException if the underlying implementation gets an exception. */ public long remove(long numOfEntries, long numOfBytes) throws XmlBlasterException { if (numOfEntries == 0) return 0L; ReturnDataHolder ret = null; synchronized(this.modificationMonitor) { try { ret = this.manager.deleteFirstEntries(getStorageId().getStrippedId(), numOfEntries, numOfBytes); this.numOfEntries -= (int)ret.countEntries; this.numOfBytes -= ret.countBytes; this.numOfPersistentBytes = -999L; this.numOfPersistentBytes = getNumOfPersistentBytes_(true); this.numOfPersistentEntries = -999L; this.numOfPersistentEntries = getNumOfPersistentEntries_(true); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -