📄 ramqueueplugin.java
字号:
int count = 0; long currentSizeInBytes = 0L; long totalSizeInBytes = 0L; ret = new ArrayList(); // it leaves at least one entry in the list while (iter.hasPrevious()) { I_QueueEntry entry = (I_QueueEntry)iter.previous(); currentSizeInBytes = entry.getSizeInBytes(); if (!isInsideRange(count, numOfEntries, totalSizeInBytes, numOfBytes)) break; totalSizeInBytes += currentSizeInBytes; if (limitEntry != null && this.comparator.compare(limitEntry, entry) >= 0) break; ret.add(entry); count++; } if (leaveOne && this.storage.size() == ret.size()) ret.remove(ret.size()-1); if (doDelete) { for (int i=0; i < ret.size(); i++) { // this.storage.removeAll(ret); I_QueueEntry entry = (I_QueueEntry)ret.get(i); if (this.notifiedAboutAddOrRemove) { entriesToRemove.add(entry); } entry.setStored(false); // tell the entry it has been removed from the storage ... if (this.storage.remove(entry)) { this.sizeInBytes -= entry.getSizeInBytes(); if (entry.isPersistent()) { this.numOfPersistentEntries--; this.persistentSizeInBytes -= entry.getSizeInBytes(); } } } } // this.sizeInBytes -= totalSizeInBytes; } } finally { for (int i=0; i < entriesToRemove.size(); i++) { ((I_Entry)entriesToRemove.get(i)).removed(this.storageId); } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * Put a message into the queue, blocks if take thread blocks synchronize */ public void put(I_QueueEntry entry, boolean ignorePutInterceptor) throws XmlBlasterException { if (entry == null) return; if (isShutdown) { if (log.isLoggable(Level.FINE)) log.fine("The queue is shutdown, put() of message " + entry.getUniqueId() + " failed"); throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "The queue is shutdown, put() of message " + entry.getUniqueId() + " failed"); } if ((this.putListener != null) && (!ignorePutInterceptor)) { // Is an interceptor registered? if (this.putListener.putPre(entry) == false) return; } if (getNumOfEntries() > property.getMaxEntries()) { // Allow superload one time only String reason = "Queue overflow (number of entries), " + property.getMaxEntries() + " messages are in queue, try increasing '" + this.property.getPropName("maxEntries") + "' on client login."; if (log.isLoggable(Level.FINE)) log.fine(reason); throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason); } if (this.getNumOfBytes() > property.getMaxBytes()) { // Allow superload one time only String reason = "Queue overflow, " + this.getNumOfBytes() + " bytes are in queue, try increasing '" + this.property.getPropName("maxBytes") + "' on client login."; if (log.isLoggable(Level.FINE)) log.fine(reason); throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason); } synchronized(this) { if (!this.storage.contains(entry)) { if (this.storage.add(entry)) { entry.setStored(true); this.sizeInBytes += entry.getSizeInBytes(); if (entry.isPersistent()) { this.numOfPersistentEntries++; this.persistentSizeInBytes += entry.getSizeInBytes(); } if (this.notifiedAboutAddOrRemove) { entry.added(this.storageId); } } } else { log.severe("Ignoring IDENTICAL uniqueId=" + entry.getUniqueId()); Thread.dumpStack(); } } this.storageSizeListenerHelper.invokeStorageSizeListener(); if (this.putListener != null && !ignorePutInterceptor) { this.putListener.putPost(entry); } } /** * Put messages into the queue, blocks if take thread blocks synchronize */ public void put(I_QueueEntry[] msgArr, boolean ignorePutInterceptor) throws XmlBlasterException { if (msgArr == null) return; //if (log.isLoggable(Level.FINER)) log.call(ME, "Entering put(" + msgArr.length + ")"); if (isShutdown) { if (log.isLoggable(Level.FINE)) log.fine("The queue is shutdown, put() of " + msgArr.length + " messages failed"); throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "The queue is shutdown, put() of " + msgArr.length + " messages failed"); } // delegate put? if ((this.putListener != null) && (!ignorePutInterceptor)) { // Is an interceptor registered and it is not bypassed if (this.putListener.putPre(msgArr) == false) return; } if (getNumOfEntries() > property.getMaxEntries()) { // Allow superload one time only String reason = "Queue overflow (num of entries), " + property.getMaxEntries() + " messages are in queue, try increasing '" + this.property.getPropName("maxEntries") + "' on client login."; if (log.isLoggable(Level.FINE)) log.fine(reason+toXml()); throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason); } if (this.getNumOfBytes() > property.getMaxBytes()) { // Allow superload one time only String reason = "Queue overflow, " + this.getNumOfBytes() + " bytes are in queue, try increasing '" + this.property.getPropName("maxBytes") + "' on client login."; if (log.isLoggable(Level.FINE)) log.fine(reason+toXml()); throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason); } synchronized(this) {// this.storage.addAll(java.util.Arrays.asList(msgArr)); for (int i=0; i < msgArr.length; i++) { I_QueueEntry entry = msgArr[i]; if (!this.storage.contains(entry)) { if (this.storage.add(entry)) { entry.setStored(true); this.sizeInBytes += entry.getSizeInBytes(); if (entry.isPersistent()) { this.numOfPersistentEntries++; this.persistentSizeInBytes += entry.getSizeInBytes(); } if (this.notifiedAboutAddOrRemove) { entry.added(this.storageId); } } } } } this.storageSizeListenerHelper.invokeStorageSizeListener(); if (this.putListener != null && !ignorePutInterceptor) { this.putListener.putPost(msgArr); } } /** * Dump state of this object into a XML ASCII string. */ public String toXml() { return toXml((String)null); } /** * Dump state of this object into a XML ASCII string. * <br> * @param extraOffset indenting of tags for nice output * @return internal state of RamQueuePlugin as a XML ASCII string */ public String toXml(String extraOffset) { StringBuffer sb = new StringBuffer(256); if (extraOffset == null) extraOffset = ""; String offset = Constants.OFFSET + extraOffset; sb.append(offset).append("<RamQueuePlugin id='").append(getStorageId().getId()); sb.append("' type='").append(getType()); sb.append("' version='").append(getVersion()); sb.append("' numOfEntries='").append(getNumOfEntries()); sb.append("' numOfBytes='").append(getNumOfBytes()); sb.append("' numOfPersistentEntries='").append(getNumOfPersistentEntries()); sb.append("' numOfPersistentBytes='").append(getNumOfPersistentBytes()); sb.append("'>"); sb.append(property.toXml(extraOffset+Constants.INDENT)); sb.append(offset).append("</RamQueuePlugin>"); return sb.toString(); } /** * @see I_Queue#removeHead(I_QueueEntry) */ public long removeHead(I_QueueEntry toEntry) throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeHead() is not implemented"); } /** * destroys silently all the resources associated to this queue. */ public void destroy() throws XmlBlasterException { synchronized (this) { this.storage.clear(); } this.shutdown(); this.property = null; } /** * @return a human readable usage help string */ public String usage() { return "no usage"; } /** * Enforced by I_Plugin * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global, PluginInfo) */ public void init(org.xmlBlaster.util.Global glob, PluginInfo pluginInfo) { this.glob = glob; this.pluginInfo = pluginInfo; } /** * Enforced by I_Plugin * @return "RAM" */ public String getType() { return "RAM"; } /** * Enforced by I_Plugin * @return "1.0" */ public String getVersion() { return "1.0"; } /** * Enforced by I_StoragePlugin * @return the pluginInfo object. */ public PluginInfo getInfo() { return this.pluginInfo; } /** * @see org.xmlBlaster.util.queue.I_StorageProblemNotifier#registerStorageProblemListener(I_StorageProblemListener) */ public boolean registerStorageProblemListener(I_StorageProblemListener listener) { return false; } /** * @see org.xmlBlaster.util.queue.I_StorageProblemNotifier#unRegisterStorageProblemListener(I_StorageProblemListener) */ public boolean unRegisterStorageProblemListener(I_StorageProblemListener listener) { return false; } /** * @see I_Queue#addStorageSizeListener(I_StorageSizeListener) */ public void addStorageSizeListener(I_StorageSizeListener listener) { this.storageSizeListenerHelper.addStorageSizeListener(listener); } /** * @see I_Queue#removeStorageSizeListener(I_StorageSizeListener) */ public void removeStorageSizeListener(I_StorageSizeListener listener) { this.storageSizeListenerHelper.removeStorageSizeListener(listener); } /** * @see I_Queue#hasStorageSizeListener(I_StorageSizeListener) */ public boolean hasStorageSizeListener(I_StorageSizeListener listener) { return this.storageSizeListenerHelper.hasStorageSizeListener(listener); } /** * @see I_Storage#getStorageSizeListeners() */ public I_StorageSizeListener[] getStorageSizeListeners() { return storageSizeListenerHelper.getStorageSizeListeners(); } /** * @see I_Queue#embeddedObjectsToXml(OutputStream, Properties) */ public long embeddedObjectsToXml(OutputStream out, Properties props) { log.warning("Sorry, dumping transient entries is not implemented"); return 0; }}/** * Sorts the messages * <ol> * <li>Priority</li> * <li>Timestamp</li> * </ol> * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry) */final class MsgComparator implements Comparator{ /** * Comparing the longs directly is 20% faster than having a * String compound key */ public int compare(Object o1, Object o2) { I_QueueEntry d1 = (I_QueueEntry)o1; I_QueueEntry d2 = (I_QueueEntry)o2; return d1.compare(d2); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -