📄 ramqueueplugin.java
字号:
/*------------------------------------------------------------------------------Name: RamQueuePlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.queue.ram;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queue.I_QueuePutListener;import org.xmlBlaster.util.queue.ReturnDataHolder;import org.xmlBlaster.util.queue.I_StoragePlugin;import org.xmlBlaster.util.queue.StorageSizeListenerHelper;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.queue.I_StorageProblemListener;import java.util.Comparator;import java.util.ArrayList;import java.util.TreeSet;import java.util.SortedSet;import java.util.Iterator;import java.util.LinkedList;import java.util.ListIterator;import java.util.Properties;import java.io.OutputStream;/** * Queueing messages in RAM only, sorted after priority and timestamp * @see <a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html">The concurrent library</a> * @author xmlBlaster@marcelruff.info */public final class RamQueuePlugin implements I_Queue, I_StoragePlugin{ private String ME = "RamQueuePlugin"; private StorageId storageId; // e.g. "history:/node/heron/12345" private boolean notifiedAboutAddOrRemove = false;// private BoundedPriorityQueue boundedPriorityQueue; private TreeSet storage; private QueuePropertyBase property; private Global glob; private static Logger log = Logger.getLogger(RamQueuePlugin.class.getName()); private I_QueuePutListener putListener; private boolean isShutdown = false; private MsgComparator comparator; private final int MAX_PRIO = 9; // see PriorityEnum.MAX_PRIORITY private long sizeInBytes = 0L; private long persistentSizeInBytes = 0L; private long numOfPersistentEntries = 0L; private PluginInfo pluginInfo; private StorageSizeListenerHelper storageSizeListenerHelper; public RamQueuePlugin() { this.storageSizeListenerHelper = new StorageSizeListenerHelper(this); } /** * Is called after the instance is created. * @param uniqueQueueId A unique name, allowing to create a unique name for a persistent store (e.g. file name) * "update:/node/heron/client/joe/2", "history:<oid>", "client:joe/2" * @param userData For example a Properties object or a String[] args object passing the configuration data * Here we expect a QueuePropertyBase instance */ public void initialize(StorageId uniqueQueueId, Object userData) throws XmlBlasterException { this.property = null; setProperties(userData); if (this.property != null && this.glob != null && this.glob.isServerSide() != this.property.getGlobal().isServerSide()) { log.severe("Incompatible globals this.property.getGlobal().isServerSide()=" + this.property.getGlobal().isServerSide() + ": " + Global.getStackTraceAsString(null)); } this.glob = this.property.getGlobal(); this.storageId = uniqueQueueId; if (storageId == null || glob == null) { Thread.dumpStack(); throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Illegal arguments in RamQueuePlugin constructor: storageId=" + storageId); } this.ME = "RamQueuePlugin-" + storageId.getId(); long maxEntries = property.getMaxEntries(); if (maxEntries > Integer.MAX_VALUE) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "initialize: The maximum number of messages is too big"); this.comparator = new MsgComparator(); this.storage = new TreeSet(this.comparator); this.numOfPersistentEntries = 0L; this.persistentSizeInBytes = 0L; this.isShutdown = false; } /** * Allows to overwrite properties which where passed on initialize(). * The properties which support hot configuration are depending on the used implementation * <p> * capacity is immutable, if you try to change a warning is logged * </p> */ public void setProperties(Object userData) throws XmlBlasterException { if (userData == null) return; QueuePropertyBase newProp; try { newProp = (QueuePropertyBase)userData; } catch(Throwable e) { // this.log is still null throw XmlBlasterException.convert(this.glob, ME, "Can't configure queue, your properties are invalid", e); // glob is allowed to be null } this.property = newProp; } /** * Access the current queue configuration */ public Object getProperties() { return this.property; } public boolean isTransient() { return true; } public void setNotifiedAboutAddOrRemove(boolean notify) { this.notifiedAboutAddOrRemove = notify; } public boolean isNotifiedAboutAddOrRemove() { return this.notifiedAboutAddOrRemove; } /** * @see I_Queue#addPutListener(I_QueuePutListener) */ public void addPutListener(I_QueuePutListener l) { if (l == null) throw new IllegalArgumentException(ME + ": addPustListener(null) is not allowed"); if (this.putListener != null) throw new IllegalArgumentException(ME + ": addPustListener() failed, there is a listener registered already"); this.putListener = l; } /** * @see I_Queue#removePutListener(I_QueuePutListener) */ public void removePutListener(I_QueuePutListener l) { this.putListener = null; } /** * Gets the references of the messages in the queue. Note that the data * which is referenced here may be changed by other threads. * @see I_Queue#getEntryReferences() */ public long[] getEntryReferences() throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "getEntryReferences() is not implemented"); } /** * Gets a copy of the entries (the messages) in the queue. If the queue * is modified, this copy will not be affected. This method is useful for client browsing. * THIS METHOD IS NOT IMPLEMENTED * @throws XmlBlasterException always */ public ArrayList getEntries(I_EntryFilter entryFilter) throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "getEntries() is not implemented"); } public void finalize() { if (log.isLoggable(Level.FINE)) log.fine("finalize - garbage collected"); } /** For verbose logging */ public StorageId getStorageId() { return storageId; } public void shutdown() { int size = 0; synchronized (this) { if (log.isLoggable(Level.FINE)) log.fine("Entering shutdown(" + this.storage.size() + ")"); if (this.isShutdown) return; this.isShutdown = true; size = this.storage.size(); } this.storageSizeListenerHelper.invokeStorageSizeListener(); this.removeStorageSizeListener(null); if (size > 0) { String reason = "Shutting down RAM queue which contains " + size + " messages"; if (log.isLoggable(Level.FINE)) log.fine(reason); //throw new XmlBlasterException(ME, reason); //handleFailure !!! } if (log.isLoggable(Level.FINER)) { log.finer("shutdown() of queue " + this.getStorageId() + " which contains " + size + "messages"); } glob.getQueuePluginManager().cleanup(this); } public boolean isShutdown() { return this.isShutdown; } /** * Flush the queue * @return The number of messages erased */ public long clear() { long ret = 0L; I_QueueEntry[] entries = null; try { synchronized(this) { ret = this.storage.size(); // Take a copy to avoid java.util.ConcurrentModificationException entries = (I_QueueEntry[])this.storage.toArray(new I_QueueEntry[this.storage.size()]); for (int ii=0; ii<entries.length; ii++) { entries[ii].setStored(false); } this.storage.clear(); this.sizeInBytes = 0L; this.persistentSizeInBytes = 0L; this.numOfPersistentEntries = 0L; } } finally { if (this.notifiedAboutAddOrRemove) { for (int ii=0; ii<entries.length; ii++) { entries[ii].removed(this.storageId); } } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return ret; } /** * @see I_Queue#remove() */ public int remove() throws XmlBlasterException { return (int)remove(1, -1L); } /** * @see I_Queue#remove(long, long) */ public long remove(long numOfEntries, long numOfBytes) throws XmlBlasterException { if (numOfEntries > Integer.MAX_VALUE) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "remove: too many entries to remove " + numOfEntries); long size = 0; ArrayList elementsToDelete = null; try { synchronized(this) { ReturnDataHolder ret = this.genericPeek((int)numOfEntries, numOfBytes, 0, 9); elementsToDelete = ret.list; // count the persistent entries (and the persistent sizes) for (int i=0; i < elementsToDelete.size(); i++) { I_QueueEntry entry = (I_QueueEntry)elementsToDelete.get(i); if (entry.isPersistent()) { this.numOfPersistentEntries--; this.persistentSizeInBytes -= entry.getSizeInBytes(); } entry.setStored(false); // tell the entry it has been removed from the storage ... } this.storage.removeAll(elementsToDelete); this.sizeInBytes -= ret.countBytes; size = elementsToDelete.size(); } } finally { if (this.notifiedAboutAddOrRemove && elementsToDelete != null) { for (int i=0; i < elementsToDelete.size(); i++) { ((I_Entry)elementsToDelete.get(i)).removed(this.storageId); } } } this.storageSizeListenerHelper.invokeStorageSizeListener(); return size; } /** */ public long removeWithPriority(long numOfEntries, long numOfBytes, int minPriority, int maxPriority) throws XmlBlasterException { if (numOfEntries > Integer.MAX_VALUE) throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "remove: too many entries to remove " + numOfEntries); ArrayList elementsToRemove = peekWithPriority((int)numOfEntries, numOfBytes, minPriority, maxPriority); boolean[] ret = removeRandom((I_Entry[])elementsToRemove.toArray(new I_Entry[elementsToRemove.size()])); long count = 0L; for (int i=0; i < ret.length;i++) if (ret[i]) count++; return count; } /** * @see I_Queue#removeTransient() */ public int removeTransient() throws XmlBlasterException { throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeTransient() is not implemented"); } /** * @see I_Queue#peek() */ public I_QueueEntry peek() { if (getNumOfEntries() < 1) return null; synchronized (this) { return (I_QueueEntry)this.storage.first();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -