⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ramqueueplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            int count = 0;            long currentSizeInBytes = 0L;            long totalSizeInBytes = 0L;            ret = new ArrayList();            // it leaves at least one entry in the list            while (iter.hasPrevious()) {               I_QueueEntry entry = (I_QueueEntry)iter.previous();               currentSizeInBytes = entry.getSizeInBytes();               if (!isInsideRange(count, numOfEntries, totalSizeInBytes, numOfBytes)) break;               totalSizeInBytes += currentSizeInBytes;               if (limitEntry != null && this.comparator.compare(limitEntry, entry) >= 0) break;               ret.add(entry);               count++;            }            if (leaveOne && this.storage.size() == ret.size()) ret.remove(ret.size()-1);            if (doDelete) {               for (int i=0; i < ret.size(); i++) {                  // this.storage.removeAll(ret);                  I_QueueEntry entry =  (I_QueueEntry)ret.get(i);                  if (this.notifiedAboutAddOrRemove) {                     entriesToRemove.add(entry);                  }                  entry.setStored(false); // tell the entry it has been removed from the storage ...                  if (this.storage.remove(entry)) {                     this.sizeInBytes -= entry.getSizeInBytes();                     if (entry.isPersistent()) {                        this.numOfPersistentEntries--;                        this.persistentSizeInBytes -= entry.getSizeInBytes();                     }                  }               }            }            // this.sizeInBytes -= totalSizeInBytes;         }      }      finally {         for (int i=0; i < entriesToRemove.size(); i++) {            ((I_Entry)entriesToRemove.get(i)).removed(this.storageId);         }      }            this.storageSizeListenerHelper.invokeStorageSizeListener();      return ret;   }   /**    * Put a message into the queue, blocks if take thread blocks synchronize    */   public void put(I_QueueEntry entry, boolean ignorePutInterceptor)      throws XmlBlasterException {      if (entry == null) return;      if (isShutdown) {         if (log.isLoggable(Level.FINE)) log.fine("The queue is shutdown, put() of message " + entry.getUniqueId() + " failed");         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "The queue is shutdown, put() of message " + entry.getUniqueId() + " failed");      }      if ((this.putListener != null) && (!ignorePutInterceptor)) {         // Is an interceptor registered?         if (this.putListener.putPre(entry) == false)            return;      }      if (getNumOfEntries() > property.getMaxEntries()) { // Allow superload one time only         String reason = "Queue overflow (number of entries), " + property.getMaxEntries() +                         " messages are in queue, try increasing '" +                         this.property.getPropName("maxEntries") + "' on client login.";         if (log.isLoggable(Level.FINE)) log.fine(reason);         throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason);      }      if (this.getNumOfBytes() > property.getMaxBytes()) { // Allow superload one time only         String reason = "Queue overflow, " + this.getNumOfBytes() +                         " bytes are in queue, try increasing '" +                          this.property.getPropName("maxBytes") + "' on client login.";         if (log.isLoggable(Level.FINE)) log.fine(reason);         throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason);      }      synchronized(this) {         if (!this.storage.contains(entry)) {            if (this.storage.add(entry)) {               entry.setStored(true);               this.sizeInBytes += entry.getSizeInBytes();               if (entry.isPersistent()) {                  this.numOfPersistentEntries++;                  this.persistentSizeInBytes += entry.getSizeInBytes();               }               if (this.notifiedAboutAddOrRemove) {                  entry.added(this.storageId);               }            }         }         else {            log.severe("Ignoring IDENTICAL uniqueId=" + entry.getUniqueId());            Thread.dumpStack();         }      }      this.storageSizeListenerHelper.invokeStorageSizeListener();      if (this.putListener != null && !ignorePutInterceptor) {         this.putListener.putPost(entry);      }   }   /**    * Put messages into the queue, blocks if take thread blocks synchronize    */   public void put(I_QueueEntry[] msgArr, boolean ignorePutInterceptor)      throws XmlBlasterException {      if (msgArr == null) return;      //if (log.isLoggable(Level.FINER)) log.call(ME, "Entering put(" + msgArr.length + ")");      if (isShutdown) {         if (log.isLoggable(Level.FINE)) log.fine("The queue is shutdown, put() of " + msgArr.length + " messages failed");         throw new XmlBlasterException(glob, ErrorCode.INTERNAL_UNKNOWN, ME, "The queue is shutdown, put() of " + msgArr.length + " messages failed");      }      // delegate put?      if ((this.putListener != null) && (!ignorePutInterceptor)) {         // Is an interceptor registered and it is not bypassed         if (this.putListener.putPre(msgArr) == false)            return;      }      if (getNumOfEntries() > property.getMaxEntries()) { // Allow superload one time only         String reason = "Queue overflow (num of entries), " + property.getMaxEntries() +                  " messages are in queue, try increasing '" + this.property.getPropName("maxEntries") + "' on client login.";         if (log.isLoggable(Level.FINE)) log.fine(reason+toXml());         throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason);      }      if (this.getNumOfBytes() > property.getMaxBytes()) { // Allow superload one time only         String reason = "Queue overflow, " + this.getNumOfBytes() + " bytes are in queue, try increasing '" +                         this.property.getPropName("maxBytes") + "' on client login.";         if (log.isLoggable(Level.FINE)) log.fine(reason+toXml());         throw new XmlBlasterException(glob, ErrorCode.RESOURCE_OVERFLOW_QUEUE_ENTRIES, ME, reason);      }      synchronized(this) {//         this.storage.addAll(java.util.Arrays.asList(msgArr));         for (int i=0; i < msgArr.length; i++) {            I_QueueEntry entry = msgArr[i];            if (!this.storage.contains(entry)) {               if (this.storage.add(entry)) {                  entry.setStored(true);                  this.sizeInBytes += entry.getSizeInBytes();                  if (entry.isPersistent()) {                     this.numOfPersistentEntries++;                     this.persistentSizeInBytes += entry.getSizeInBytes();                  }                  if (this.notifiedAboutAddOrRemove) {                     entry.added(this.storageId);                  }               }            }         }      }      this.storageSizeListenerHelper.invokeStorageSizeListener();      if (this.putListener != null && !ignorePutInterceptor) {         this.putListener.putPost(msgArr);      }   }      /**    * Dump state of this object into a XML ASCII string.    */   public String toXml() {      return toXml((String)null);   }   /**    * Dump state of this object into a XML ASCII string.    * <br>    * @param extraOffset indenting of tags for nice output    * @return internal state of RamQueuePlugin as a XML ASCII string    */   public String toXml(String extraOffset) {      StringBuffer sb = new StringBuffer(256);      if (extraOffset == null) extraOffset = "";      String offset = Constants.OFFSET + extraOffset;      sb.append(offset).append("<RamQueuePlugin id='").append(getStorageId().getId());      sb.append("' type='").append(getType());      sb.append("' version='").append(getVersion());      sb.append("' numOfEntries='").append(getNumOfEntries());      sb.append("' numOfBytes='").append(getNumOfBytes());      sb.append("' numOfPersistentEntries='").append(getNumOfPersistentEntries());      sb.append("' numOfPersistentBytes='").append(getNumOfPersistentBytes());      sb.append("'>");      sb.append(property.toXml(extraOffset+Constants.INDENT));      sb.append(offset).append("</RamQueuePlugin>");      return sb.toString();   }   /**    * @see I_Queue#removeHead(I_QueueEntry)    */   public long removeHead(I_QueueEntry toEntry) throws XmlBlasterException {      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "removeHead() is not implemented");   }   /**    * destroys silently all the resources associated to this queue.    */   public void destroy() throws XmlBlasterException {      synchronized (this) {         this.storage.clear();      }      this.shutdown();      this.property = null;   }   /**    * @return a human readable usage help string    */   public String usage() {      return "no usage";   }   /**    * Enforced by I_Plugin    * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global, PluginInfo)    */   public void init(org.xmlBlaster.util.Global glob, PluginInfo pluginInfo) {      this.glob = glob;      this.pluginInfo = pluginInfo;   }   /**    * Enforced by I_Plugin    * @return "RAM"    */   public String getType() { return "RAM"; }   /**    * Enforced by I_Plugin    * @return "1.0"    */   public String getVersion() { return "1.0"; }   /**    * Enforced by I_StoragePlugin    * @return the pluginInfo object.    */   public PluginInfo getInfo() { return this.pluginInfo; }   /**    * @see org.xmlBlaster.util.queue.I_StorageProblemNotifier#registerStorageProblemListener(I_StorageProblemListener)    */   public boolean registerStorageProblemListener(I_StorageProblemListener listener) {      return false;   }   /**    * @see org.xmlBlaster.util.queue.I_StorageProblemNotifier#unRegisterStorageProblemListener(I_StorageProblemListener)    */   public boolean unRegisterStorageProblemListener(I_StorageProblemListener listener) {      return false;   }   /**    * @see I_Queue#addStorageSizeListener(I_StorageSizeListener)    */   public void addStorageSizeListener(I_StorageSizeListener listener) {      this.storageSizeListenerHelper.addStorageSizeListener(listener);   }      /**    * @see I_Queue#removeStorageSizeListener(I_StorageSizeListener)    */   public void removeStorageSizeListener(I_StorageSizeListener listener) {      this.storageSizeListenerHelper.removeStorageSizeListener(listener);   }      /**    * @see I_Queue#hasStorageSizeListener(I_StorageSizeListener)    */   public boolean hasStorageSizeListener(I_StorageSizeListener listener) {      return this.storageSizeListenerHelper.hasStorageSizeListener(listener);   }   /**    * @see I_Storage#getStorageSizeListeners()    */   public I_StorageSizeListener[] getStorageSizeListeners() {      return storageSizeListenerHelper.getStorageSizeListeners();   }   /**    * @see I_Queue#embeddedObjectsToXml(OutputStream, Properties)    */   public long embeddedObjectsToXml(OutputStream out, Properties props) {      log.warning("Sorry, dumping transient entries is not implemented");      return 0;   }}/** * Sorts the messages * <ol> *   <li>Priority</li> *   <li>Timestamp</li> * </ol> * @see org.xmlBlaster.util.queuemsg.MsgQueueEntry#compare(I_QueueEntry) */final class MsgComparator implements Comparator{   /**    * Comparing the longs directly is 20% faster than having a    * String compound key    */   public int compare(Object o1, Object o2) {      I_QueueEntry d1 = (I_QueueEntry)o1;      I_QueueEntry d2 = (I_QueueEntry)o2;      return d1.compare(d2);   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -