⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ramqueueplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------------Name:      RamQueuePlugin.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.queue.ram;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_StorageSizeListener;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_QueueEntry;import org.xmlBlaster.util.queue.I_QueuePutListener;import org.xmlBlaster.util.queue.ReturnDataHolder;import org.xmlBlaster.util.queue.I_StoragePlugin;import org.xmlBlaster.util.queue.StorageSizeListenerHelper;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.queue.I_StorageProblemListener;import java.util.Comparator;import java.util.ArrayList;import java.util.TreeSet;import java.util.SortedSet;import java.util.Iterator;import java.util.LinkedList;import java.util.ListIterator;import java.util.Properties;import java.io.OutputStream;/** * Queueing messages in RAM only, sorted after priority and timestamp * @see <a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html">The concurrent library</a> * @author xmlBlaster@marcelruff.info */public final class RamQueuePlugin implements I_Queue, I_StoragePlugin{   private String ME = "RamQueuePlugin";   private StorageId storageId;       // e.g. "history:/node/heron/12345"   private boolean notifiedAboutAddOrRemove = false;//   private BoundedPriorityQueue boundedPriorityQueue;   private TreeSet storage;   private QueuePropertyBase property;   private Global glob;   private static Logger log = Logger.getLogger(RamQueuePlugin.class.getName());   private I_QueuePutListener putListener;   private boolean isShutdown = false;   private MsgComparator comparator;   private final int MAX_PRIO = 9; // see PriorityEnum.MAX_PRIORITY   private long sizeInBytes = 0L;   private long persistentSizeInBytes = 0L;   private long numOfPersistentEntries = 0L;   private PluginInfo pluginInfo;   private StorageSizeListenerHelper storageSizeListenerHelper;      public RamQueuePlugin() {      this.storageSizeListenerHelper = new StorageSizeListenerHelper(this);   }      /**    * Is called after the instance is created.    * @param uniqueQueueId A unique name, allowing to create a unique name for a persistent store (e.g. file name)    *                "update:/node/heron/client/joe/2", "history:<oid>", "client:joe/2"    * @param userData For example a Properties object or a String[] args object passing the configuration data    *                 Here we expect a QueuePropertyBase instance    */   public void initialize(StorageId uniqueQueueId, Object userData) throws XmlBlasterException {      this.property = null;      setProperties(userData);            if (this.property != null && this.glob != null && this.glob.isServerSide() != this.property.getGlobal().isServerSide()) {         log.severe("Incompatible globals this.property.getGlobal().isServerSide()=" + this.property.getGlobal().isServerSide() + ": " + Global.getStackTraceAsString(null));      }      this.glob = this.property.getGlobal();      this.storageId = uniqueQueueId;      if (storageId == null || glob == null) {         Thread.dumpStack();         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "Illegal arguments in RamQueuePlugin constructor: storageId=" + storageId);      }      this.ME = "RamQueuePlugin-" + storageId.getId();      long maxEntries = property.getMaxEntries();      if (maxEntries > Integer.MAX_VALUE)         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "initialize: The maximum number of messages is too big");      this.comparator = new MsgComparator();      this.storage = new TreeSet(this.comparator);      this.numOfPersistentEntries = 0L;      this.persistentSizeInBytes = 0L;      this.isShutdown = false;   }   /**    * Allows to overwrite properties which where passed on initialize().     * The properties which support hot configuration are depending on the used implementation    * <p>    * capacity is immutable, if you try to change a warning is logged    * </p>    */   public void setProperties(Object userData) throws XmlBlasterException {      if (userData == null) return;      QueuePropertyBase newProp;      try {         newProp = (QueuePropertyBase)userData;      }      catch(Throwable e) { // this.log is still null         throw XmlBlasterException.convert(this.glob, ME, "Can't configure queue, your properties are invalid", e); // glob is allowed to be null      }      this.property = newProp;   }   /**    * Access the current queue configuration    */   public Object getProperties() {      return this.property;   }   public boolean isTransient() {      return true;   }   public void setNotifiedAboutAddOrRemove(boolean notify) {      this.notifiedAboutAddOrRemove = notify;   }   public boolean isNotifiedAboutAddOrRemove() {      return this.notifiedAboutAddOrRemove;   }   /**    * @see I_Queue#addPutListener(I_QueuePutListener)    */   public void addPutListener(I_QueuePutListener l) {      if (l == null)         throw new IllegalArgumentException(ME + ": addPustListener(null) is not allowed");      if (this.putListener != null)         throw new IllegalArgumentException(ME + ": addPustListener() failed, there is a listener registered already");      this.putListener = l;   }   /**    * @see I_Queue#removePutListener(I_QueuePutListener)    */   public void removePutListener(I_QueuePutListener l) {      this.putListener = null;   }   /**    * Gets the references of the messages in the queue. Note that the data    * which is referenced here may be changed by other threads.    * @see I_Queue#getEntryReferences()    */   public long[] getEntryReferences() throws XmlBlasterException {      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "getEntryReferences() is not implemented");   }   /**    * Gets a copy of the entries (the messages) in the queue. If the queue    * is modified, this copy will not be affected. This method is useful for client browsing.    * THIS METHOD IS NOT IMPLEMENTED    * @throws XmlBlasterException always    */   public ArrayList getEntries(I_EntryFilter entryFilter) throws XmlBlasterException {      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "getEntries() is not implemented");   }   public void finalize() {      if (log.isLoggable(Level.FINE)) log.fine("finalize - garbage collected");   }   /** For verbose logging */   public StorageId getStorageId() {      return storageId;   }   public void shutdown() {      int size = 0;      synchronized (this) {         if (log.isLoggable(Level.FINE)) log.fine("Entering shutdown(" + this.storage.size() + ")");         if (this.isShutdown) return;         this.isShutdown = true;         size = this.storage.size();      }      this.storageSizeListenerHelper.invokeStorageSizeListener();      this.removeStorageSizeListener(null);      if (size > 0) {         String reason = "Shutting down RAM queue which contains " + size + " messages";         if (log.isLoggable(Level.FINE)) log.fine(reason);         //throw new XmlBlasterException(ME, reason);         //handleFailure !!!      }      if (log.isLoggable(Level.FINER)) {         log.finer("shutdown() of queue " + this.getStorageId() + " which contains " + size + "messages");      }            glob.getQueuePluginManager().cleanup(this);   }   public boolean isShutdown() {      return this.isShutdown;   }   /**    * Flush the queue    * @return The number of messages erased    */   public long clear() {      long ret = 0L;      I_QueueEntry[] entries = null;      try {         synchronized(this) {            ret = this.storage.size();               // Take a copy to avoid java.util.ConcurrentModificationException            entries = (I_QueueEntry[])this.storage.toArray(new I_QueueEntry[this.storage.size()]);               for (int ii=0; ii<entries.length; ii++) {               entries[ii].setStored(false);            }               this.storage.clear();            this.sizeInBytes = 0L;            this.persistentSizeInBytes = 0L;            this.numOfPersistentEntries = 0L;         }      }      finally {         if (this.notifiedAboutAddOrRemove) {            for (int ii=0; ii<entries.length; ii++) {               entries[ii].removed(this.storageId);            }         }      }      this.storageSizeListenerHelper.invokeStorageSizeListener();      return ret;   }   /**    * @see I_Queue#remove()    */   public int remove() throws XmlBlasterException {      return (int)remove(1, -1L);   }   /**    * @see I_Queue#remove(long, long)    */   public long remove(long numOfEntries, long numOfBytes)      throws XmlBlasterException   {      if (numOfEntries > Integer.MAX_VALUE)         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "remove: too many entries to remove " + numOfEntries);      long size = 0;      ArrayList elementsToDelete = null;      try {         synchronized(this) {            ReturnDataHolder ret = this.genericPeek((int)numOfEntries, numOfBytes, 0, 9);            elementsToDelete = ret.list;            // count the persistent entries (and the persistent sizes)            for (int i=0; i < elementsToDelete.size(); i++) {               I_QueueEntry entry = (I_QueueEntry)elementsToDelete.get(i);               if (entry.isPersistent()) {                  this.numOfPersistentEntries--;                  this.persistentSizeInBytes -= entry.getSizeInBytes();               }               entry.setStored(false); // tell the entry it has been removed from the storage ...            }            this.storage.removeAll(elementsToDelete);            this.sizeInBytes -= ret.countBytes;            size = elementsToDelete.size();         }      }      finally {         if (this.notifiedAboutAddOrRemove && elementsToDelete != null) {            for (int i=0; i < elementsToDelete.size(); i++) {               ((I_Entry)elementsToDelete.get(i)).removed(this.storageId);            }         }      }     this.storageSizeListenerHelper.invokeStorageSizeListener();     return size;   }   /**    */   public long removeWithPriority(long numOfEntries, long numOfBytes, int minPriority, int maxPriority)      throws XmlBlasterException {      if (numOfEntries > Integer.MAX_VALUE)         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "remove: too many entries to remove " + numOfEntries);      ArrayList elementsToRemove = peekWithPriority((int)numOfEntries, numOfBytes, minPriority, maxPriority);      boolean[] ret = removeRandom((I_Entry[])elementsToRemove.toArray(new I_Entry[elementsToRemove.size()]));      long count = 0L;      for (int i=0; i < ret.length;i++) if (ret[i]) count++;      return count;   }   /**    * @see I_Queue#removeTransient()    */   public int removeTransient() throws XmlBlasterException {      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeTransient() is not implemented");   }   /**    * @see I_Queue#peek()    */   public I_QueueEntry peek() {      if (getNumOfEntries() < 1) return null;      synchronized (this) {         return (I_QueueEntry)this.storage.first();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -