⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cachequeueinterceptorplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
      this.property = newProp;      this.transientQueue.setProperties(createRamCopy((QueuePropertyBase)userData));      if (this.persistentQueue != null) this.persistentQueue.setProperties(userData);   }   // JMX   public String getPropertyStr() {      return (this.property == null) ? "" : this.property.toXml();   }   /**    * Access the current queue configuration    */   public Object getProperties() {      return this.property;   }   public void setNotifiedAboutAddOrRemove(boolean notify) {      this.notifiedAboutAddOrRemove = notify;   }   public boolean isNotifiedAboutAddOrRemove() {      return this.notifiedAboutAddOrRemove;   }   /**    * @see I_Queue#addPutListener(I_QueuePutListener)    */   synchronized public void addPutListener(I_QueuePutListener l) {      if (l == null)         throw new IllegalArgumentException(ME + ": addPustListener(null) is not allowed");      if (this.putListener != null)         throw new IllegalArgumentException(ME + ": addPustListener() failed, there is a listener registered already");      this.putListener = l;   }   /**    * @see I_Queue#removePutListener(I_QueuePutListener)    */   synchronized public void removePutListener(I_QueuePutListener l) {      this.putListener = null;   }   /**    * returns the persistent queue (null if no one defined)    */   public I_Queue getPersistentQueue() {      return this.persistentQueue;   }   /**    * returns the transient queue (null if no one defined)    */   public I_Queue getTransientQueue() {      return this.transientQueue;   }   /**    * Gets the references of the entries in the queue. Note that the data    * which is referenced here may be changed by other threads.    */   public long[] getEntryReferences() throws XmlBlasterException {      // currently not implemented      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "getEntryReferences not implemented");   }   /**    * @see I_Queue#getEntries(I_EntryFilter)    */   public ArrayList getEntries(I_EntryFilter entryFilter) throws XmlBlasterException {           if (this.persistentQueue == null) return new ArrayList();           return this.persistentQueue.getEntries(entryFilter);   }   /**    * @see I_Queue#put(I_QueueEntry, boolean)    */   public void put(I_QueueEntry queueEntry, boolean ignorePutInterceptor)      throws XmlBlasterException {      I_QueueEntry[] entries = new I_QueueEntry[1];      entries[0] = queueEntry;      put(entries, ignorePutInterceptor);   }   /**    * All entries are stored into the transient queue. All persistent messages are    * stored also in the persistent queue. The exceeding size in the transient    * queue is calculated. If it is positive it means we need to swap. The    * overflowing messages are taken from the ram queue. The volatile between    * them are stored in the persistent storage (since the persistent ones have    * been previously stored).    *     * @see I_Queue#put(I_QueueEntry[], boolean)    */   public void put(I_QueueEntry[] queueEntries, boolean ignorePutInterceptor)      throws XmlBlasterException {      if (queueEntries == null || queueEntries.length < 1) return;      if ((this.putListener != null) && (!ignorePutInterceptor)) {         // Is an interceptor registered (and not bypassed) ?         if (this.putListener.putPre(queueEntries) == false)            return; // supress adding entries to queue (bypasses the queue)      }      synchronized (this) {         checkEntriesAvailable(this, 0L, true, "first check in put"); // throws XmlBlasterException if no space left         checkSpaceAvailable(this, 0L, true,  "first check in put"); // throws XmlBlasterException if no space left                 long sizeOfEntries = 0L;         // separate persistent from transient messages and store the persistents in persistence         if (isPersistenceAvailable()) {            ArrayList persistentsFromEntries = new ArrayList();            long sizeOfPersistents = 0L;            long numOfPersistents = 0L;                    for (int i=0; i < queueEntries.length; i++) {               if (queueEntries[i].isPersistent()) {                  persistentsFromEntries.add(queueEntries[i]);                  sizeOfPersistents += queueEntries[i].getSizeInBytes();                  numOfPersistents++;               }               else sizeOfEntries += queueEntries[i].getSizeInBytes();            }            sizeOfEntries += sizeOfPersistents;                    if (persistentsFromEntries.size() > 0) {               try {                  this.persistentQueue.put((I_QueueEntry[])persistentsFromEntries.toArray(new I_QueueEntry[persistentsFromEntries.size()]), ignorePutInterceptor);               }               catch (XmlBlasterException ex) {                  log.severe(ME+"put: an error occured when writing to the persistent queue: " + persistentsFromEntries.size() + " persistent entries will temporarly be handled as transient. Is the DB up and running ? " + ex.getMessage() + "state "  + this.toXml(""));                  // should an exception be rethrown here ? No because it should be possible to work even if no persistence available               }               catch (Throwable ex) {                  log.severe(ME+"put: an error occured when writing to the persistent queue: " + persistentsFromEntries.size() + " persistent entries will temporarly be handled as transient. Is the DB up and running ? " + ex.toString() + "state "  + this.toXml(""));                  ex.printStackTrace();               }            }         }                         synchronized(this.peekSync) {                    // put all messages on transient queue            this.transientQueue.put(queueEntries, ignorePutInterceptor);            if (isPersistenceAvailable()) { // if no persistence available let RAM overflow one time               // handle swapping (if any)               // TODO swap only if bigger than max entries (not same as max entries)               long exceedingSize = -checkSpaceAvailable(this.transientQueue, 0L, false, "second check in put");               long exceedingEntries = -checkEntriesAvailable(this.transientQueue, 0L, false, "second check in put");               if ( (exceedingSize >= 0L && this.persistentQueue.getMaxNumOfBytes() > this.transientQueue.getMaxNumOfBytes()) ||                     (exceedingEntries >= 0L && this.persistentQueue.getMaxNumOfEntries() > this.transientQueue.getMaxNumOfEntries())) {                  if (log.isLoggable(Level.FINE)) log.fine(ME+"Swapping. Exceeding size (in bytes): " + exceedingSize + " exceeding entries: " + exceedingEntries + " state: " + toXml(""));                              ArrayList transients = null;                  try {                     ArrayList swaps = null;                     boolean needsLoading = false;                     if (this.transientQueue.getNumOfEntries() == 0)                        swaps = this.transientQueue.takeLowest((int)exceedingEntries, exceedingSize, null, true);                     else {                        swaps = this.transientQueue.takeLowest(queueEntries.length, sizeOfEntries, null, true);                        needsLoading = true;                     }                     if (log.isLoggable(Level.FINE)) {                        log.fine(ME+"Swapping: moving '" + swaps.size() + "' entries from transient queue to persistent queue: exceedingEntries='" + exceedingEntries + "' and exceedingSize='" + exceedingSize + "'");                     }                     // get the transients                     transients = new ArrayList();                     for (int i=0; i < swaps.size(); i++) {                        I_QueueEntry entry = (I_QueueEntry)swaps.get(i);                        if (!entry.isPersistent()) {                           transients.add(entry);                        }                     }                     if (transients.size() > 0)                        this.persistentQueue.put((I_QueueEntry[])transients.toArray(new I_QueueEntry[transients.size()]), ignorePutInterceptor);                     if (needsLoading) loadFromPersistence();                  }                  catch (XmlBlasterException ex) {                     log.severe(ME+"put: an error occured when swapping: " +  transients.size() + ". Is the DB up and running ? " + ex.getMessage() + " state: " + toXml(""));                     ex.printStackTrace();                  }                  catch (Throwable ex) {                     log.severe(ME+"put: an error occured when swapping: " +  transients.size() + ". Is the DB up and running ? " + ex.toString());                     ex.printStackTrace();                  }               }            }         } // end of peekSync here ...      } // end of synchronized here ...      // these must be outside the synchronized ...      if (this.notifiedAboutAddOrRemove) {         for(int i=0; i<queueEntries.length; i++)            try {               queueEntries[i].added(this.queueId);            }            catch (Throwable ex) {               log.severe(ME+"put: an error occured when notifying : " + ex.toString());               ex.printStackTrace();            }      }      this.storageSizeListenerHelper.invokeStorageSizeListener();      if ((this.putListener != null) && (!ignorePutInterceptor)) {         this.putListener.putPost(queueEntries);      }   }   // JMX   public String getQueueName() {      return getStorageId().getStrippedId();   }   /**    * Returns the unique ID of this queue    */   public StorageId getStorageId() {      return this.queueId;   }   /**    * @see I_Queue#takeWithPriority(int,long,int,int)    */   public ArrayList takeWithPriority(int numOfEntries, long numOfBytes, int minPriority, int maxPriority)      throws XmlBlasterException {      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "takeWithPriority not implemented");      // if (this.notifiedAboutAddOrRemove) {}   }   private final boolean hasTransientsSwapped() {      return this.persistentQueue.getNumOfPersistentEntries() != this.persistentQueue.getNumOfEntries();   }   private final boolean isPersistenceAvailable() {      return this.persistentQueue != null && this.isConnected;   }         private final boolean hasUncachedEntries() {      return hasTransientsSwapped() ||              this.persistentQueue.getNumOfPersistentEntries() != this.transientQueue.getNumOfPersistentEntries();   }   /**    * Aware: peekLowest is not implemented!!    * @see I_Queue#peekLowest(int, long, I_QueueEntry, boolean)    */   public ArrayList peekLowest(int numOfEntries, long numOfBytes, I_QueueEntry limitEntry, boolean leaveOne)      throws XmlBlasterException {      throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME, "peekLowest is not implemented");   }   /**    * Aware: takeLowest for more than one entry is not implemented!!    * @see I_Queue#takeLowest(int, long, I_QueueEntry, boolean)    */   public ArrayList takeLowest(int numOfEntries, long numOfBytes, I_QueueEntry limitEntry, boolean leaveOne)      throws XmlBlasterException {      ArrayList list = null;      boolean doNotify = false;      try {         synchronized(this) {            boolean handlePersistents = isPersistenceAvailable() && hasUncachedEntries();            if ( handlePersistents ) {                // swapping               try {                  list = this.persistentQueue.takeLowest(numOfEntries, numOfBytes, limitEntry, leaveOne);                  doNotify = true;               }               catch (Throwable ex) {                  handlePersistents = false;                  log.severe(ME+"takeLowest: exception occured when taking the lowest entry from the persistent queue: " + ex.toString());                  ex.printStackTrace();               }                          if (handlePersistents) {                  if (list.size() > 1) {                     throw new XmlBlasterException(glob, ErrorCode.INTERNAL_NOTIMPLEMENTED, ME,                              "takeLowest for more than one entry is not implemented");                  }                             long num = 0L;                  boolean[] hlp = this.transientQueue.removeRandom((I_Entry[])list.toArray(new I_Entry[list.size()]));                  for (int i=0; i < hlp.length; i++) if (hlp[i]) num++;                  if (num > 0L) {                     if (log.isLoggable(Level.FINE)) log.fine(ME+"Didn't expect message " + ((I_Entry)list.get(0)).getLogId() + " in transient store." +                                " If the database was temporary unavailable this is possible " + this.toXml(""));                  }               }            }            // 'else' is no good here since it could have changed due to the exception ...            if ( !handlePersistents) {               list = this.transientQueue.takeLowest(numOfEntries, numOfBytes, limitEntry, leaveOne);               doNotify = true;               if (isPersistenceAvailable() && list.size() > 0 && this.persistentQueue.getNumOfEntries() > 0) {                  boolean durableFound = false;                  for(int i=0; i<list.size(); i++) {                     if (((I_Entry)list.get(i)).isPersistent()) {                        durableFound = true;                        break;                     }                  }                  if (durableFound) {                     this.persistentQueue.removeRandom((I_Entry[])list.toArray(new I_Entry[list.size()]));                  }               }            }                 } // end of syncrhonized       }      finally {         if (doNotify) {            if (this.notifiedAboutAddOrRemove) {               for(int i=0; i<list.size(); i++) ((I_Entry)list.get(i)).removed(this.queueId);            }         }         this.storageSizeListenerHelper.invokeStorageSizeListener();      }      return list;   }   // JMX   public String peekStr() throws Exception {      try {         I_QueueEntry entry = peek();         return (entry == null) ? "No entry found" :                     entry.getLogId() + " - " + entry.getSizeInBytes() + " bytes - " +                     ((entry.isPersistent()) ? "persistent" : "transient") +                     " prio=" + entry.getPriority() +                      " " + entry.getEmbeddedType(); // no toXml() available ??       }      catch(XmlBlasterException e) {         throw new Exception(e);      }   }   // JMX   public String[] peekEntries(int numOfEntries) throws Exception {      if (numOfEntries == 0)         return new String[] { "Please pass number of messages to peak" };      try {         ArrayList list = peek(numOfEntries, -1L);         if (list == null || list.size()<1)            return new String[] { "No entry found" };         String[] ret = new String[list.size()];         for (int i=0; i<list.size(); i++) {            I_QueueEntry entry = (I_QueueEntry)list.get(i);            // no toXml() available ??             ret[i] = entry.getLogId() + " - " + entry.getSizeInBytes() + " bytes - " +                     ((entry.isPersistent()) ? "persistent" : "transient") +                     " prio=" + entry.getPriority() +                      " " + entry.getEmbeddedType(); // no toXml() available ??          }         return ret;      }      catch (XmlBlasterException e) {         throw new Exception(e);      }   }   /**    * @see I_Queue#peek()    */   public I_QueueEntry peek() throws XmlBlasterException {      synchronized(this.peekSync) {         return this.transientQueue.peek();      }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -