📄 jdbcqueuecommontableplugin.java
字号:
catch (XmlBlasterException ex) { resetCounters(); throw ex; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); if (ret == null) return 0L; return ret.countEntries; } /** * Removes the given entry. * @param dataId the unique id. It must be unique within the storage area * of the implementing queue. In other words, if the underlying * implementation is on RAM, then the storage area is the JVM, that * is the queue must be unique in the same JVM. If the queue is a * jdbc, the dataId is unique in the DB used. */ public int removeRandom(long dataId) throws XmlBlasterException { long[] args = new long[1]; args[0] = dataId; if (removeRandom(args)[0]) return 1; else return 0; } /** * Removes the given entries. * @param msgQueueEntry the entry to erase. */ public boolean[] removeRandom(long[] dataIdArray) throws XmlBlasterException { ArrayList list = this.manager.getEntries(getStorageId(), dataIdArray); return removeRandom((I_Entry[])list.toArray(new I_Entry[list.size()])); } /** * @see I_Queue#removeRandom(I_Entry) */ public int removeRandom(I_Entry entry) throws XmlBlasterException { if (entry == null) return 0; long id = entry.getUniqueId(); long currentAmount = entry.getSizeInBytes(); long currentPersistentSize = 0L; long currentPersistentEntries = 0L; if (entry.isPersistent()) { currentPersistentSize += currentAmount; currentPersistentEntries = 1L; } int ret = 0; synchronized(this.modificationMonitor) { ret = this.manager.deleteEntry(getStorageId().getStrippedId(), id); if (ret > 0) { // then we need to retrieve the values this.numOfEntries--; this.numOfBytes -= currentAmount; this.numOfPersistentBytes -= currentPersistentSize; this.numOfPersistentEntries -= currentPersistentEntries; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * @see I_Queue#removeRandom(I_Entry[]) */ public boolean[] removeRandom(I_Entry[] queueEntries) throws XmlBlasterException { if (queueEntries == null || queueEntries.length == 0) return new boolean[0]; long[] ids = new long[queueEntries.length]; long currentAmount = 0L; long currentPersistentSize = 0L; long currentPersistentEntries = 0L; for (int i=0; i < ids.length; i++) { ids[i] = queueEntries[i].getUniqueId(); currentAmount += queueEntries[i].getSizeInBytes(); if (queueEntries[i].isPersistent()) { currentPersistentSize += queueEntries[i].getSizeInBytes(); currentPersistentEntries++; } } boolean[] tmp = null; synchronized(this.modificationMonitor) { try {/* int[] tmp = this.manager.deleteEntriesBatch(getStorageId().getStrippedId(), ids); long sum = 0; for (int i=0; i < tmp.length; i++) { if (log.isLoggable(Level.FINE)) log.trace(ME, "removeRandom: entry '" + i + "' is '" + tmp[i]); ret[i] = tmp[i] > 0 || tmp[i] == -2; // !!! JDK 1.4 only: Statement.SUCCESS_NO_INFO = -2; if (ret[i]) sum++; }*/ tmp = this.manager.deleteEntries(getStorageId().getStrippedId(), ids); long sum = 0; for (int i=0; i < tmp.length; i++) { if (tmp[i]) sum++; } if (log.isLoggable(Level.FINE)) log.fine("randomRemove: the number of removed entries is '" + sum + "'"); this.numOfEntries -= sum; if ((int)sum != queueEntries.length) { // then we need to retrieve the values resetCounters(); // now it can be optimized since boolean[] is given back } else { this.numOfBytes -= currentAmount; this.numOfPersistentBytes -= currentPersistentSize; this.numOfPersistentEntries -= currentPersistentEntries; } } catch (XmlBlasterException ex) { resetCounters(); throw ex; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return tmp; } /** * @see I_Queue#removeWithPriority(long, long, int, int) */ public long removeWithPriority(long numOfEntries, long numOfBytes, int minPriority, int maxPriority) throws XmlBlasterException { ArrayList array = this.peekWithPriority((int)numOfEntries, numOfBytes, minPriority, maxPriority); boolean ret[] = removeRandom((I_QueueEntry[])array.toArray(new I_QueueEntry[array.size()])); long count = 0L; for (int i=0; i < ret.length; i++) if (ret[i]) count++; return count; } /** * @see I_Queue#removeTransient() */ public int removeTransient() throws XmlBlasterException { int ret = 0; synchronized(this.modificationMonitor) { try { ret = this.manager.deleteAllTransient(getStorageId().getStrippedId()); this.numOfEntries -= ret; // not so performant but only called on init this.numOfBytes = -999L; this.numOfBytes = getNumOfBytes_(); } catch (XmlBlasterException ex) { resetCounters(); throw ex; } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * It returns the size of the queue. Note that this call will return the size * stored in cache. * In case this value is negative (which means a previous attempt to read from the * DB failed) it will synchronize against the DB by making a call to the DB. * If that fails it will return -1L. * If the log DUMP is set to true, then a refresh of the cache is done by every invocation * and if the cached value is different from the real value an error is written. * * @see I_Queue#getNumOfEntries() * @exception XmlBlasterException if number is not retrievable */ private final long getNumOfEntries_() throws XmlBlasterException { if (this.numOfEntries > -1L && !this.debug) return this.numOfEntries; synchronized (this.modificationMonitor) { long oldValue = this.numOfEntries; this.numOfEntries = this.manager.getNumOfEntries(getStorageId().getStrippedId()); if (this.debug) { if (oldValue != this.numOfEntries && oldValue != -999L) { // don't log if explicitly set the oldValue String txt = "getNumOfEntries: an inconsistency occured between the cached value and the real value of 'numOfEntries': it was '" + oldValue + "' but should have been '" + this.numOfEntries + "'"; throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME, txt + toXml("")); } } else if (log.isLoggable(Level.FINE)) log.fine("getNumOfEntries_ old (cached) value: '" + oldValue + "' new (real) value: '" + this.numOfEntries + "'"); return this.numOfEntries; } } /** * It returns the size of the queue. Note that this call will return the size * stored in cache. * In case this value is -1L (which means a previous attempt to read from the * DB failed) it will synchronize against the DB by making a call to the DB. * If that fails it will return -1L. * * @see I_Queue#getNumOfEntries() */ public long getNumOfEntries() { try { return getNumOfEntries_(); } catch (XmlBlasterException ex) { log.severe("getNumOfEntries, exception: " + ex.getMessage()); return this.numOfEntries; } } /** * It returns the number of persistent entries in the queue. * In case this value is -1L (which means a previous attempt to read from the * DB failed) it will synchronize against the DB by making a call to the DB. * If that fails it will return -1L. * * @param verbose If true we throw an exception on errors, if false we ignore the error silently * @see I_Queue#getNumOfPersistentEntries() */ private long getNumOfPersistentEntries_(boolean verbose) throws XmlBlasterException { if (this.numOfPersistentEntries > -1L && !this.debug) return this.numOfPersistentEntries; synchronized (this.modificationMonitor) { try { long oldValue = this.numOfPersistentEntries; this.numOfPersistentEntries = this.manager.getNumOfPersistents(getStorageId().getStrippedId()); if (this.debug) { if (oldValue != this.numOfPersistentEntries && oldValue != -999L) { // don't log if explicitly set the oldValue String txt = "getNumOfPersistentEntries: an inconsistency occured between the cached value and the real value of 'numOfPersistentEntries': it was '" + oldValue + "' but should have been '" + this.numOfPersistentEntries + "'"; throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME, txt + toXml("")); } } else if (log.isLoggable(Level.FINE)) log.fine("getNumOfPersistentEntries_ old (cached) value: '" + oldValue + "' new (real) value: '" + this.numOfPersistentEntries + "'"); return this.numOfPersistentEntries; } catch (XmlBlasterException ex) { if (verbose) { // If called from toXml() we need to suppress this exeption because we here call toXml() again throw ex; // the verbose flag is probably not needed anymore ... } return -1L; } } } /** * It returns the number of persistent entries in the queue. * In case this value is -1L (which means a previous attempt to read from the * DB failed) it will synchronize against the DB by making a call to the DB. * If that fails it will return -1L. * * @see I_Queue#getNumOfPersistentEntries() */ public long getNumOfPersistentEntries() { try { return getNumOfPersistentEntries_(true); } catch (XmlBlasterException ex) { log.severe("getNumOfEntries, exception: " + ex.getMessage()); return this.numOfPersistentEntries; } } /** * @see I_Queue#getMaxNumOfEntries() */ public long getMaxNumOfEntries() { return this.property.getMaxEntries(); } /** * It returns the size of the queue. Note that this call will return the size * stored in cache. * In case this value is -1L (which means a previous attempt to read from the * DB failed) it will synchronize against the DB by making a call to the DB. * If that fails it will return -1L. * * @see I_Queue#getNumOfBytes() */ private long getNumOfBytes_() throws XmlBlasterException { if (this.numOfBytes > -1L && !this.debug) return this.numOfBytes; synchronized (this.modificationMonitor) { long oldValue = this.numOfBytes; this.numOfBytes = this.manager.getNumOfBytes(getStorageId().getStrippedId()); if (this.debug) { if (oldValue != this.numOfBytes && oldValue != -999L) { // don't log if explicitly set the oldValue String txt = "getNumOfBytes: an inconsistency occured between the cached value and the real value of 'numOfPersistentBytes': it was '" + oldValue + "' but should have been '" + this.numOfBytes + "'"; throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME, txt + toXml("")); } } else if (log.isLoggable(Level.FINE)) log.fine("getNumOfBytes_ old (cached) value: '" + oldValue + "' new (real) value: '" + this.numOfBytes + "'"); return this.numOfBytes; } } /** * It returns the size of the queue. Note that this call will return the size * stored in cache. * In case this value is -1L (which means a previous attempt to read from the * DB failed) it will synchronize against the DB by making a call to the DB. * If that fails it will return -1L. * * @see I_Queue#getNumOfBytes() */ public long getNumOfBytes() { try { return getNumOfBytes_(); } catch (XmlBlasterException ex) { log.fine("getNumOfBytes, exception: " + ex.getMessage()); return this.numOfBytes; } } /** * It returns the number of persistent entries in the queue. * In case this value is -1L (which means a previous attempt to read from the * DB failed) it will synchronize against the DB by making a call to the DB. * If that fails it will return -1L. * * @param verbose If true we throw an exception on errors, if false we ignore the error silently * @see I_Queue#getNumOfPersistentBytes() */ private long getNumOfPersistentBytes_(boolean verbose) throws XmlBlasterException { if (this.numOfPersistentBytes > -1L && !this.debug) return this.numOfPersistentBytes; synchronized (this.modificationMonitor) { try { long oldValue = this.numOfPersistentBytes; this.numOfPersistentBytes = this.manager.getSizeOfPersistents(getStorageId().getStrippedId()); if (this.debug) { if (oldValue != this.numOfPersistentBytes && oldValue != -999L) { // don't log if explicitly set the oldValue String txt = "getNumOfPersistentBytes: an inconsistency occured between the cached value and the real value of 'numOfPersistentBytes': it was '" + oldValue + "' but should have been '" + this.numOfPersistentBytes + "'"; throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME, txt + toXml("")); } } else if (log.isLoggable(Level.FINE)) log.warning("getNumOfPersistentBytes_ old (cached) value: '" + oldValue + "' new (real) value: '" + this.numOfPersistentBytes + "'"); return this.numOfPersistentBytes; } catch (XmlBlasterException ex) { if (verbose) { // If called from toXml() we need to suppress this exeption because we here call toXml() again throw ex; // probably verbose is not needed anymore ... }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -